3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
28 class TransportPCEtesting(unittest.TestCase):
31 WAITING = 20 # nominal value is 300
32 NODE_VERSION = '2.2.1'
34 cr_serv_sample_data = {"input": {
35 "sdnc-request-header": {
36 "request-id": "request-1",
37 "rpc-action": "service-create",
38 "request-system-id": "appname"
40 "service-name": "service1-OCH-OTU4",
41 "common-id": "commonId",
42 "connection-type": "infrastructure",
44 "service-rate": "100",
45 "node-id": "SPDR-SA1",
46 "service-format": "OTU",
47 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
51 "committed-info-rate": "100000",
52 "committed-burst-size": "64"
57 "port-device-name": "SPDR-SA1-XPDR1",
59 "port-name": "XPDR1-NETWORK1",
60 "port-rack": "000000.00",
61 "port-shelf": "Chassis#1"
64 "lgx-device-name": "Some lgx-device-name",
65 "lgx-port-name": "Some lgx-port-name",
66 "lgx-port-rack": "000000.00",
67 "lgx-port-shelf": "00"
72 "port-device-name": "SPDR-SA1-XPDR1",
74 "port-name": "XPDR1-NETWORK1",
75 "port-rack": "000000.00",
76 "port-shelf": "Chassis#1"
79 "lgx-device-name": "Some lgx-device-name",
80 "lgx-port-name": "Some lgx-port-name",
81 "lgx-port-rack": "000000.00",
82 "lgx-port-shelf": "00"
88 "service-rate": "100",
89 "node-id": "SPDR-SC1",
90 "service-format": "OTU",
91 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
95 "committed-info-rate": "100000",
96 "committed-burst-size": "64"
101 "port-device-name": "SPDR-SC1-XPDR1",
102 "port-type": "fixed",
103 "port-name": "XPDR1-NETWORK1",
104 "port-rack": "000000.00",
105 "port-shelf": "Chassis#1"
108 "lgx-device-name": "Some lgx-device-name",
109 "lgx-port-name": "Some lgx-port-name",
110 "lgx-port-rack": "000000.00",
111 "lgx-port-shelf": "00"
116 "port-device-name": "SPDR-SC1-XPDR1",
117 "port-type": "fixed",
118 "port-name": "XPDR1-NETWORK1",
119 "port-rack": "000000.00",
120 "port-shelf": "Chassis#1"
123 "lgx-device-name": "Some lgx-device-name",
124 "lgx-port-name": "Some lgx-port-name",
125 "lgx-port-rack": "000000.00",
126 "lgx-port-shelf": "00"
131 "due-date": "2018-06-15T00:00:01Z",
132 "operator-contact": "pw1234"
138 cls.processes = test_utils.start_tpce()
139 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
140 ('roadma', cls.NODE_VERSION),
141 ('roadmc', cls.NODE_VERSION),
142 ('spdrc', cls.NODE_VERSION)])
145 def tearDownClass(cls):
146 # pylint: disable=not-an-iterable
147 for process in cls.processes:
148 test_utils.shutdown_process(process)
149 print("all processes killed")
154 def test_01_connect_spdrA(self):
155 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
156 self.assertEqual(response.status_code,
157 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
159 def test_02_connect_spdrC(self):
160 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
161 self.assertEqual(response.status_code,
162 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
164 def test_03_connect_rdmA(self):
165 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
166 self.assertEqual(response.status_code,
167 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
169 def test_04_connect_rdmC(self):
170 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
171 self.assertEqual(response.status_code,
172 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
174 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
175 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
176 "ROADM-A1", "1", "SRG1-PP1-TXRX")
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
179 self.assertIn('Xponder Roadm Link created successfully',
180 res["output"]["result"])
183 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
184 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
185 "ROADM-A1", "1", "SRG1-PP1-TXRX")
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
188 self.assertIn('Roadm Xponder links created successfully',
189 res["output"]["result"])
192 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
193 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
194 "ROADM-C1", "1", "SRG1-PP1-TXRX")
195 self.assertEqual(response.status_code, requests.codes.ok)
196 res = response.json()
197 self.assertIn('Xponder Roadm Link created successfully',
198 res["output"]["result"])
201 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
202 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
203 "ROADM-C1", "1", "SRG1-PP1-TXRX")
204 self.assertEqual(response.status_code, requests.codes.ok)
205 res = response.json()
206 self.assertIn('Roadm Xponder links created successfully',
207 res["output"]["result"])
210 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
211 # Config ROADMA-ROADMC oms-attributes
213 "auto-spanloss": "true",
214 "spanloss-base": 11.4,
215 "spanloss-current": 12,
216 "engineered-spanloss": 12.2,
217 "link-concatenation": [{
220 "SRLG-length": 100000,
222 response = test_utils.add_oms_attr_request(
223 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
224 self.assertEqual(response.status_code, requests.codes.created)
226 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
227 # Config ROADMC-ROADMA oms-attributes
229 "auto-spanloss": "true",
230 "spanloss-base": 11.4,
231 "spanloss-current": 12,
232 "engineered-spanloss": 12.2,
233 "link-concatenation": [{
236 "SRLG-length": 100000,
238 response = test_utils.add_oms_attr_request(
239 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
240 self.assertEqual(response.status_code, requests.codes.created)
242 # test service-create for OCH-OTU4 service from spdr to spdr
243 def test_11_check_otn_topology(self):
244 response = test_utils.get_otn_topo_request()
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
247 nbNode = len(res['network'][0]['node'])
248 self.assertEqual(nbNode, 6, 'There should be 6 nodes')
249 self.assertNotIn('ietf-network-topology:link', res['network'][0])
251 def test_12_create_OCH_OTU4_service(self):
252 response = test_utils.service_create_request(self.cr_serv_sample_data)
253 self.assertEqual(response.status_code, requests.codes.ok)
254 res = response.json()
255 self.assertIn('PCE calculation in progress',
256 res['output']['configuration-response-common']['response-message'])
257 time.sleep(self.WAITING)
259 def test_13_get_OCH_OTU4_service1(self):
260 response = test_utils.get_service_list_request(
261 "services/service1-OCH-OTU4")
262 self.assertEqual(response.status_code, requests.codes.ok)
263 res = response.json()
265 res['services'][0]['administrative-state'], 'inService')
267 res['services'][0]['service-name'], 'service1-OCH-OTU4')
269 res['services'][0]['connection-type'], 'infrastructure')
271 res['services'][0]['lifecycle-state'], 'planned')
274 # Check correct configuration of devices
275 def test_14_check_interface_och_spdra(self):
276 response = test_utils.check_netconf_node_request(
277 "SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
278 self.assertEqual(response.status_code, requests.codes.ok)
279 res = response.json()
280 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
281 'administrative-state': 'inService',
282 'supporting-circuit-pack-name': 'CP1-CFP0',
283 'type': 'org-openroadm-interfaces:opticalChannel',
284 'supporting-port': 'CP1-CFP0-P1'
285 }, **res['interface'][0]),
288 self.assertDictEqual(
289 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
290 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
291 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
293 def test_15_check_interface_OTU4_spdra(self):
294 response = test_utils.check_netconf_node_request(
295 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
296 self.assertEqual(response.status_code, requests.codes.ok)
297 res = response.json()
298 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
299 'administrative-state': 'inService',
300 'supporting-circuit-pack-name': 'CP1-CFP0',
301 'supporting-interface': 'XPDR1-NETWORK1-761:768',
302 'type': 'org-openroadm-interfaces:otnOtu',
303 'supporting-port': 'CP1-CFP0-P1'
305 input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
306 'expected-dapi': 'Swfw02qXGyI=',
307 'rate': 'org-openroadm-otn-common-types:OTU4',
310 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
313 self.assertDictEqual(input_dict_2,
315 ['org-openroadm-otn-otu-interfaces:otu'])
317 def test_16_check_interface_och_spdrc(self):
318 response = test_utils.check_netconf_node_request(
319 "SPDR-SC1", "interface/XPDR1-NETWORK1-761:768")
320 self.assertEqual(response.status_code, requests.codes.ok)
321 res = response.json()
322 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
323 'administrative-state': 'inService',
324 'supporting-circuit-pack-name': 'CP1-CFP0',
325 'type': 'org-openroadm-interfaces:opticalChannel',
326 'supporting-port': 'CP1-CFP0-P1'
327 }, **res['interface'][0]),
330 self.assertDictEqual(
331 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
332 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
333 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
335 def test_17_check_interface_OTU4_spdrc(self):
336 response = test_utils.check_netconf_node_request(
337 "SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
338 self.assertEqual(response.status_code, requests.codes.ok)
339 res = response.json()
340 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
341 'administrative-state': 'inService',
342 'supporting-circuit-pack-name': 'CP1-CFP0',
343 'supporting-interface': 'XPDR1-NETWORK1-761:768',
344 'type': 'org-openroadm-interfaces:otnOtu',
345 'supporting-port': 'CP1-CFP0-P1'
347 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
348 'expected-sapi': 'Swfw02qXGyI=',
349 'tx-sapi': 'fuYZwEO660g=',
350 'expected-dapi': 'fuYZwEO660g=',
351 'rate': 'org-openroadm-otn-common-types:OTU4',
355 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
358 self.assertDictEqual(input_dict_2,
360 ['org-openroadm-otn-otu-interfaces:otu'])
362 def test_18_check_no_interface_ODU4_spdra(self):
363 response = test_utils.check_netconf_node_request(
364 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
365 self.assertEqual(response.status_code, requests.codes.conflict)
366 res = response.json()
368 {"error-type": "application", "error-tag": "data-missing",
369 "error-message": "Request could not be completed because the relevant data model content does not exist"},
370 res['errors']['error'])
372 def test_19_check_openroadm_topo_spdra(self):
373 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
374 self.assertEqual(response.status_code, requests.codes.ok)
375 res = response.json()
376 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
377 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
378 self.assertEqual({'frequency': 196.1,
380 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
383 def test_20_check_openroadm_topo_ROADMA_SRG(self):
384 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
385 self.assertEqual(response.status_code, requests.codes.ok)
386 res = response.json()
387 freq_map = base64.b64decode(
388 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
389 freq_map_array = [int(x) for x in freq_map]
390 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
391 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
393 if ele['tp-id'] == 'SRG1-PP1-TXRX':
394 freq_map = base64.b64decode(
395 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
396 freq_map_array = [int(x) for x in freq_map]
397 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
398 if ele['tp-id'] == 'SRG1-PP2-TXRX':
399 self.assertNotIn('avail-freq-maps', dict.keys(ele))
402 def test_21_check_openroadm_topo_ROADMA_DEG(self):
403 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
404 self.assertEqual(response.status_code, requests.codes.ok)
405 res = response.json()
406 freq_map = base64.b64decode(
407 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
408 freq_map_array = [int(x) for x in freq_map]
409 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
410 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
412 if ele['tp-id'] == 'DEG2-CTP-TXRX':
413 freq_map = base64.b64decode(
414 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
415 freq_map_array = [int(x) for x in freq_map]
416 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
417 if ele['tp-id'] == 'DEG2-TTP-TXRX':
418 freq_map = base64.b64decode(
419 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
420 freq_map_array = [int(x) for x in freq_map]
421 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
424 def test_22_check_otn_topo_otu4_links(self):
425 response = test_utils.get_otn_topo_request()
426 self.assertEqual(response.status_code, requests.codes.ok)
427 res = response.json()
428 nb_links = len(res['network'][0]['ietf-network-topology:link'])
429 self.assertEqual(nb_links, 2)
430 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
431 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
432 for link in res['network'][0]['ietf-network-topology:link']:
433 self.assertIn(link['link-id'], listLinkId)
435 link['transportpce-topology:otn-link-type'], 'OTU4')
437 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
439 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
441 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
443 link['org-openroadm-common-network:opposite-link'], listLinkId)
445 # test service-create for ODU4 service from spdr to spdr
446 def test_23_create_ODU4_service(self):
447 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
448 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
449 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
450 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
451 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
452 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
453 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
455 response = test_utils.service_create_request(self.cr_serv_sample_data)
456 self.assertEqual(response.status_code, requests.codes.ok)
457 res = response.json()
458 self.assertIn('PCE calculation in progress',
459 res['output']['configuration-response-common']['response-message'])
460 time.sleep(self.WAITING)
462 def test_24_get_ODU4_service1(self):
463 response = test_utils.get_service_list_request(
464 "services/service1-ODU4")
465 self.assertEqual(response.status_code, requests.codes.ok)
466 res = response.json()
468 res['services'][0]['administrative-state'], 'inService')
470 res['services'][0]['service-name'], 'service1-ODU4')
472 res['services'][0]['connection-type'], 'infrastructure')
474 res['services'][0]['lifecycle-state'], 'planned')
477 def test_25_check_interface_ODU4_spdra(self):
478 response = test_utils.check_netconf_node_request(
479 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
480 self.assertEqual(response.status_code, requests.codes.ok)
481 res = response.json()
482 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
483 'administrative-state': 'inService',
484 'supporting-circuit-pack-name': 'CP1-CFP0',
485 'supporting-interface': 'XPDR1-NETWORK1-OTU',
486 'type': 'org-openroadm-interfaces:otnOdu',
487 'supporting-port': 'CP1-CFP0-P1'}
488 # SAPI/DAPI are added in the Otu4 renderer
489 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
490 'rate': 'org-openroadm-otn-common-types:ODU4',
491 'expected-dapi': 'Swfw02qXGyI=',
492 'expected-sapi': 'fuYZwEO660g=',
493 'tx-dapi': 'fuYZwEO660g=',
494 'tx-sapi': 'Swfw02qXGyI='}
496 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
498 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
500 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
502 self.assertDictEqual(
503 {'payload-type': '21', 'exp-payload-type': '21'},
504 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
506 def test_26_check_interface_ODU4_spdrc(self):
507 response = test_utils.check_netconf_node_request(
508 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
509 self.assertEqual(response.status_code, requests.codes.ok)
510 res = response.json()
511 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
512 'administrative-state': 'inService',
513 'supporting-circuit-pack-name': 'CP1-CFP0',
514 'supporting-interface': 'XPDR1-NETWORK1-OTU',
515 'type': 'org-openroadm-interfaces:otnOdu',
516 'supporting-port': 'CP1-CFP0-P1'}
517 # SAPI/DAPI are added in the Otu4 renderer
518 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
519 'rate': 'org-openroadm-otn-common-types:ODU4',
520 'tx-sapi': 'fuYZwEO660g=',
521 'tx-dapi': 'Swfw02qXGyI=',
522 'expected-sapi': 'Swfw02qXGyI=',
523 'expected-dapi': 'fuYZwEO660g='
525 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
527 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
529 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
531 self.assertDictEqual(
532 {'payload-type': '21', 'exp-payload-type': '21'},
533 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
535 def test_27_check_otn_topo_links(self):
536 response = test_utils.get_otn_topo_request()
537 self.assertEqual(response.status_code, requests.codes.ok)
538 res = response.json()
539 nb_links = len(res['network'][0]['ietf-network-topology:link'])
540 self.assertEqual(nb_links, 4)
541 for link in res['network'][0]['ietf-network-topology:link']:
542 linkId = link['link-id']
543 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
544 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
546 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
548 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
549 elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
550 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
552 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
554 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
556 link['transportpce-topology:otn-link-type'], 'ODTU4')
558 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
559 self.assertIn(link['org-openroadm-common-network:opposite-link'],
560 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
561 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
563 self.fail("this link should not exist")
565 def test_28_check_otn_topo_tp(self):
566 response = test_utils.get_otn_topo_request()
567 res = response.json()
568 for node in res['network'][0]['node']:
569 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
570 tpList = node['ietf-network-topology:termination-point']
572 if tp['tp-id'] == 'XPDR1-NETWORK1':
573 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
574 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
576 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
577 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
578 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
580 # test service-create for 10GE service from spdr to spdr
581 def test_29_create_10GE_service(self):
582 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
583 self.cr_serv_sample_data["input"]["connection-type"] = "service"
584 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
585 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
586 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
587 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
588 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
589 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
590 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
591 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
592 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
593 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
594 response = test_utils.service_create_request(self.cr_serv_sample_data)
595 self.assertEqual(response.status_code, requests.codes.ok)
596 res = response.json()
597 self.assertIn('PCE calculation in progress',
598 res['output']['configuration-response-common']['response-message'])
599 time.sleep(self.WAITING)
601 def test_30_get_10GE_service1(self):
602 response = test_utils.get_service_list_request(
603 "services/service1-10GE")
604 self.assertEqual(response.status_code, requests.codes.ok)
605 res = response.json()
607 res['services'][0]['administrative-state'], 'inService')
609 res['services'][0]['service-name'], 'service1-10GE')
611 res['services'][0]['connection-type'], 'service')
613 res['services'][0]['lifecycle-state'], 'planned')
616 def test_31_check_interface_10GE_CLIENT_spdra(self):
617 response = test_utils.check_netconf_node_request(
618 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
619 self.assertEqual(response.status_code, requests.codes.ok)
620 res = response.json()
621 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
622 'administrative-state': 'inService',
623 'supporting-circuit-pack-name': 'CP1-SFP4',
624 'type': 'org-openroadm-interfaces:ethernetCsmacd',
625 'supporting-port': 'CP1-SFP4-P1'
627 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
629 self.assertDictEqual(
631 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
633 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
634 response = test_utils.check_netconf_node_request(
635 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
636 self.assertEqual(response.status_code, requests.codes.ok)
637 res = response.json()
638 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
639 'administrative-state': 'inService',
640 'supporting-circuit-pack-name': 'CP1-SFP4',
641 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
642 'type': 'org-openroadm-interfaces:otnOdu',
643 'supporting-port': 'CP1-SFP4-P1'}
645 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
646 'rate': 'org-openroadm-otn-common-types:ODU2e',
647 'monitoring-mode': 'terminated'}
649 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
651 self.assertDictEqual(dict(input_dict_2,
652 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
653 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
654 self.assertDictEqual(
655 {'payload-type': '03', 'exp-payload-type': '03'},
656 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
658 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
659 response = test_utils.check_netconf_node_request(
660 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
661 self.assertEqual(response.status_code, requests.codes.ok)
662 res = response.json()
663 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
664 'administrative-state': 'inService',
665 'supporting-circuit-pack-name': 'CP1-CFP0',
666 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
667 'type': 'org-openroadm-interfaces:otnOdu',
668 'supporting-port': 'CP1-CFP0-P1'}
670 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
671 'rate': 'org-openroadm-otn-common-types:ODU2e',
672 'monitoring-mode': 'monitored'}
673 input_dict_3 = {'trib-port-number': 1}
675 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
677 self.assertDictEqual(dict(input_dict_2,
678 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
679 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
680 self.assertDictEqual(dict(input_dict_3,
681 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
682 'parent-odu-allocation']),
683 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
684 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
687 def test_34_check_ODU2E_connection_spdra(self):
688 response = test_utils.check_netconf_node_request(
690 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
691 self.assertEqual(response.status_code, requests.codes.ok)
692 res = response.json()
695 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
696 'direction': 'bidirectional'
699 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
700 res['odu-connection'][0])
701 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'},
702 res['odu-connection'][0]['destination'])
703 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'},
704 res['odu-connection'][0]['source'])
706 def test_35_check_interface_10GE_CLIENT_spdrc(self):
707 response = test_utils.check_netconf_node_request(
708 "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
709 self.assertEqual(response.status_code, requests.codes.ok)
710 res = response.json()
711 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
712 'administrative-state': 'inService',
713 'supporting-circuit-pack-name': 'CP1-SFP4',
714 'type': 'org-openroadm-interfaces:ethernetCsmacd',
715 'supporting-port': 'CP1-SFP4-P1'
717 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
719 self.assertDictEqual(
721 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
723 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
724 response = test_utils.check_netconf_node_request(
725 "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
726 self.assertEqual(response.status_code, requests.codes.ok)
727 res = response.json()
728 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
729 'administrative-state': 'inService',
730 'supporting-circuit-pack-name': 'CP1-SFP4',
731 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
732 'type': 'org-openroadm-interfaces:otnOdu',
733 'supporting-port': 'CP1-SFP4-P1'}
735 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
736 'rate': 'org-openroadm-otn-common-types:ODU2e',
737 'monitoring-mode': 'terminated'}
739 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
741 self.assertDictEqual(dict(input_dict_2,
742 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
743 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
744 self.assertDictEqual(
745 {'payload-type': '03', 'exp-payload-type': '03'},
746 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
748 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
749 response = test_utils.check_netconf_node_request(
750 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
751 self.assertEqual(response.status_code, requests.codes.ok)
752 res = response.json()
753 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
754 'administrative-state': 'inService',
755 'supporting-circuit-pack-name': 'CP1-CFP0',
756 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
757 'type': 'org-openroadm-interfaces:otnOdu',
758 'supporting-port': 'CP1-CFP0-P1'}
760 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
761 'rate': 'org-openroadm-otn-common-types:ODU2e',
762 'monitoring-mode': 'monitored'}
764 input_dict_3 = {'trib-port-number': 1}
766 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
768 self.assertDictEqual(dict(input_dict_2,
769 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
770 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
771 self.assertDictEqual(dict(input_dict_3,
772 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
773 'parent-odu-allocation']),
774 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
775 'parent-odu-allocation'])
778 'org-openroadm-otn-odu-interfaces:odu'][
779 'parent-odu-allocation']['trib-slots'])
781 def test_38_check_ODU2E_connection_spdrc(self):
782 response = test_utils.check_netconf_node_request(
784 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
785 self.assertEqual(response.status_code, requests.codes.ok)
786 res = response.json()
789 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
790 'direction': 'bidirectional'
793 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
794 res['odu-connection'][0])
795 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'},
796 res['odu-connection'][0]['destination'])
797 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'},
798 res['odu-connection'][0]['source'])
800 def test_39_check_otn_topo_links(self):
801 response = test_utils.get_otn_topo_request()
802 self.assertEqual(response.status_code, requests.codes.ok)
803 res = response.json()
804 nb_links = len(res['network'][0]['ietf-network-topology:link'])
805 self.assertEqual(nb_links, 4)
806 for link in res['network'][0]['ietf-network-topology:link']:
807 linkId = link['link-id']
808 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
809 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
811 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
813 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
815 def test_40_check_otn_topo_tp(self):
816 response = test_utils.get_otn_topo_request()
817 res = response.json()
818 for node in res['network'][0]['node']:
819 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
820 tpList = node['ietf-network-topology:termination-point']
822 if tp['tp-id'] == 'XPDR1-NETWORK1':
823 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
824 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
825 tsPoolList = list(range(1, 9))
827 tsPoolList, xpdrTpPortConAt['ts-pool'])
829 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
831 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
833 def test_41_delete_10GE_service(self):
834 response = test_utils.service_delete_request("service1-10GE")
835 self.assertEqual(response.status_code, requests.codes.ok)
836 res = response.json()
837 self.assertIn('Renderer service delete in progress',
838 res['output']['configuration-response-common']['response-message'])
839 time.sleep(self.WAITING)
841 def test_42_check_service_list(self):
842 response = test_utils.get_service_list_request("")
843 self.assertEqual(response.status_code, requests.codes.ok)
844 res = response.json()
845 self.assertEqual(len(res['service-list']['services']), 2)
848 def test_43_check_no_ODU2e_connection_spdra(self):
849 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
850 self.assertEqual(response.status_code, requests.codes.ok)
851 res = response.json()
852 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
855 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
856 response = test_utils.check_netconf_node_request(
857 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
858 self.assertEqual(response.status_code, requests.codes.conflict)
860 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
861 response = test_utils.check_netconf_node_request(
862 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
863 self.assertEqual(response.status_code, requests.codes.conflict)
865 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
866 response = test_utils.check_netconf_node_request(
867 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
868 self.assertEqual(response.status_code, requests.codes.conflict)
870 def test_47_check_otn_topo_links(self):
871 response = test_utils.get_otn_topo_request()
872 self.assertEqual(response.status_code, requests.codes.ok)
873 res = response.json()
874 nb_links = len(res['network'][0]['ietf-network-topology:link'])
875 self.assertEqual(nb_links, 4)
876 for link in res['network'][0]['ietf-network-topology:link']:
877 linkId = link['link-id']
878 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
879 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
881 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
883 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
885 def test_48_check_otn_topo_tp(self):
886 response = test_utils.get_otn_topo_request()
887 res = response.json()
888 for node in res['network'][0]['node']:
889 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
890 tpList = node['ietf-network-topology:termination-point']
892 if tp['tp-id'] == 'XPDR1-NETWORK1':
893 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
894 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
896 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
898 def test_49_delete_ODU4_service(self):
899 response = test_utils.service_delete_request("service1-ODU4")
900 self.assertEqual(response.status_code, requests.codes.ok)
901 res = response.json()
902 self.assertIn('Renderer service delete in progress',
903 res['output']['configuration-response-common']['response-message'])
904 time.sleep(self.WAITING)
906 def test_50_check_service_list(self):
907 response = test_utils.get_service_list_request("")
908 self.assertEqual(response.status_code, requests.codes.ok)
909 res = response.json()
910 self.assertEqual(len(res['service-list']['services']), 1)
913 def test_51_check_no_interface_ODU4_spdra(self):
914 response = test_utils.check_netconf_node_request(
915 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
916 self.assertEqual(response.status_code, requests.codes.conflict)
918 def test_52_check_otn_topo_links(self):
919 self.test_22_check_otn_topo_otu4_links()
921 def test_53_check_otn_topo_tp(self):
922 response = test_utils.get_otn_topo_request()
923 res = response.json()
924 for node in res['network'][0]['node']:
925 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
926 tpList = node['ietf-network-topology:termination-point']
928 if tp['tp-id'] == 'XPDR1-NETWORK1':
929 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
930 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
932 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
934 def test_54_delete_OCH_OTU4_service(self):
935 response = test_utils.service_delete_request("service1-OCH-OTU4")
936 self.assertEqual(response.status_code, requests.codes.ok)
937 res = response.json()
938 self.assertIn('Renderer service delete in progress',
939 res['output']['configuration-response-common']['response-message'])
940 time.sleep(self.WAITING)
942 def test_55_get_no_service(self):
943 response = test_utils.get_service_list_request("")
944 self.assertEqual(response.status_code, requests.codes.conflict)
945 res = response.json()
947 {"error-type": "application", "error-tag": "data-missing",
948 "error-message": "Request could not be completed because the relevant data model content does not exist"},
949 res['errors']['error'])
952 def test_56_check_no_interface_OTU4_spdra(self):
953 response = test_utils.check_netconf_node_request(
954 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
955 self.assertEqual(response.status_code, requests.codes.conflict)
957 def test_57_check_no_interface_OCH_spdra(self):
958 response = test_utils.check_netconf_node_request(
959 "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
960 self.assertEqual(response.status_code, requests.codes.conflict)
962 def test_58_getLinks_OtnTopology(self):
963 response = test_utils.get_otn_topo_request()
964 self.assertEqual(response.status_code, requests.codes.ok)
965 res = response.json()
966 self.assertNotIn('ietf-network-topology:link', res['network'][0])
968 def test_59_check_openroadm_topo_spdra(self):
969 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
970 self.assertEqual(response.status_code, requests.codes.ok)
971 res = response.json()
972 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
973 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
974 self.assertNotIn('wavelength', dict.keys(
975 tp['org-openroadm-network-topology:xpdr-network-attributes']))
978 def test_60_check_openroadm_topo_ROADMA_SRG(self):
979 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
980 self.assertEqual(response.status_code, requests.codes.ok)
981 res = response.json()
982 freq_map = base64.b64decode(
983 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
984 freq_map_array = [int(x) for x in freq_map]
985 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
986 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
988 if ele['tp-id'] == 'SRG1-PP1-TXRX':
989 freq_map = base64.b64decode(
990 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
991 freq_map_array = [int(x) for x in freq_map]
992 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
995 def test_61_check_openroadm_topo_ROADMA_DEG(self):
996 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
997 self.assertEqual(response.status_code, requests.codes.ok)
998 res = response.json()
999 freq_map = base64.b64decode(
1000 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1001 freq_map_array = [int(x) for x in freq_map]
1002 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1003 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1004 for ele in liste_tp:
1005 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1006 freq_map = base64.b64decode(
1007 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1008 freq_map_array = [int(x) for x in freq_map]
1009 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1010 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1011 freq_map = base64.b64decode(
1012 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1013 freq_map_array = [int(x) for x in freq_map]
1014 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1017 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
1018 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "3", "1",
1019 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1020 self.assertEqual(response.status_code, requests.codes.ok)
1021 res = response.json()
1022 self.assertIn('Xponder Roadm Link created successfully',
1023 res["output"]["result"])
1026 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
1027 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "3", "1",
1028 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1029 self.assertEqual(response.status_code, requests.codes.ok)
1030 res = response.json()
1031 self.assertIn('Roadm Xponder links created successfully',
1032 res["output"]["result"])
1035 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1036 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "3", "1",
1037 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1038 self.assertEqual(response.status_code, requests.codes.ok)
1039 res = response.json()
1040 self.assertIn('Xponder Roadm Link created successfully',
1041 res["output"]["result"])
1044 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1045 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "3", "1",
1046 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1047 self.assertEqual(response.status_code, requests.codes.ok)
1048 res = response.json()
1049 self.assertIn('Roadm Xponder links created successfully',
1050 res["output"]["result"])
1053 def test_66_create_OCH_OTU4_service_2(self):
1054 # pylint: disable=line-too-long
1055 self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1056 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1057 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1058 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1059 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1060 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1061 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1062 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1063 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1064 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1065 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1066 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1067 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1068 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1069 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1070 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1071 response = test_utils.service_create_request(self.cr_serv_sample_data)
1072 self.assertEqual(response.status_code, requests.codes.ok)
1073 res = response.json()
1074 self.assertIn('PCE calculation in progress',
1075 res['output']['configuration-response-common']['response-message'])
1076 time.sleep(self.WAITING)
1078 def test_67_get_OCH_OTU4_service2(self):
1079 response = test_utils.get_service_list_request(
1080 "services/service2-OCH-OTU4")
1081 self.assertEqual(response.status_code, requests.codes.ok)
1082 res = response.json()
1084 res['services'][0]['administrative-state'], 'inService')
1086 res['services'][0]['service-name'], 'service2-OCH-OTU4')
1088 res['services'][0]['connection-type'], 'infrastructure')
1090 res['services'][0]['lifecycle-state'], 'planned')
1093 def test_68_create_ODU4_service_2(self):
1094 # pylint: disable=line-too-long
1095 self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1096 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1097 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1098 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1099 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1100 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1101 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1102 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1103 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1104 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1105 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1106 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1107 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1108 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1109 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1111 response = test_utils.service_create_request(self.cr_serv_sample_data)
1112 self.assertEqual(response.status_code, requests.codes.ok)
1113 res = response.json()
1114 self.assertIn('PCE calculation in progress',
1115 res['output']['configuration-response-common']['response-message'])
1116 time.sleep(self.WAITING)
1118 def test_69_get_ODU4_service2(self):
1119 response = test_utils.get_service_list_request(
1120 "services/service2-ODU4")
1121 self.assertEqual(response.status_code, requests.codes.ok)
1122 res = response.json()
1124 res['services'][0]['administrative-state'], 'inService')
1126 res['services'][0]['service-name'], 'service2-ODU4')
1128 res['services'][0]['connection-type'], 'infrastructure')
1130 res['services'][0]['lifecycle-state'], 'planned')
1133 def test_70_create_1GE_service(self):
1134 # pylint: disable=line-too-long
1135 self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1136 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1137 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1138 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1139 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1140 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1141 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1142 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1143 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1144 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1145 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1146 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1147 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1148 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1149 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1150 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1151 response = test_utils.service_create_request(self.cr_serv_sample_data)
1152 self.assertEqual(response.status_code, requests.codes.ok)
1153 res = response.json()
1154 self.assertIn('PCE calculation in progress',
1155 res['output']['configuration-response-common']['response-message'])
1156 time.sleep(self.WAITING)
1158 def test_71_get_1GE_service1(self):
1159 response = test_utils.get_service_list_request("services/service1-1GE")
1160 self.assertEqual(response.status_code, requests.codes.ok)
1161 res = response.json()
1163 res['services'][0]['administrative-state'], 'inService')
1165 res['services'][0]['service-name'], 'service1-1GE')
1167 res['services'][0]['connection-type'], 'service')
1169 res['services'][0]['lifecycle-state'], 'planned')
1172 def test_72_check_interface_1GE_CLIENT_spdra(self):
1173 response = test_utils.check_netconf_node_request(
1174 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1175 self.assertEqual(response.status_code, requests.codes.ok)
1176 res = response.json()
1177 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1178 'administrative-state': 'inService',
1179 'supporting-circuit-pack-name': 'CP1-SFP4',
1180 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1181 'supporting-port': 'CP1-SFP4-P1'
1183 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1184 res['interface'][0])
1185 self.assertDictEqual(
1187 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1189 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1190 response = test_utils.check_netconf_node_request(
1191 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1192 self.assertEqual(response.status_code, requests.codes.ok)
1193 res = response.json()
1194 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1195 'administrative-state': 'inService',
1196 'supporting-circuit-pack-name': 'CP3-SFP1',
1197 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1198 'type': 'org-openroadm-interfaces:otnOdu',
1199 'supporting-port': 'CP3-SFP1-P1'}
1201 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1202 'rate': 'org-openroadm-otn-common-types:ODU0',
1203 'monitoring-mode': 'terminated'}
1205 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1206 res['interface'][0])
1207 self.assertDictEqual(dict(input_dict_2,
1208 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1209 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1210 self.assertDictEqual(
1211 {'payload-type': '07', 'exp-payload-type': '07'},
1212 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1214 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1215 response = test_utils.check_netconf_node_request(
1216 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1217 self.assertEqual(response.status_code, requests.codes.ok)
1218 res = response.json()
1219 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1220 'administrative-state': 'inService',
1221 'supporting-circuit-pack-name': 'CP3-CFP0',
1222 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1223 'type': 'org-openroadm-interfaces:otnOdu',
1224 'supporting-port': 'CP3-CFP0-P1'}
1226 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1227 'rate': 'org-openroadm-otn-common-types:ODU0',
1228 'monitoring-mode': 'monitored'}
1229 input_dict_3 = {'trib-port-number': 1}
1231 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1232 res['interface'][0])
1233 self.assertDictEqual(dict(input_dict_2,
1234 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1235 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1236 self.assertDictEqual(dict(input_dict_3,
1237 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1238 'parent-odu-allocation']),
1239 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1240 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1243 def test_75_check_ODU0_connection_spdra(self):
1244 response = test_utils.check_netconf_node_request(
1246 "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1247 self.assertEqual(response.status_code, requests.codes.ok)
1248 res = response.json()
1251 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1252 'direction': 'bidirectional'
1255 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1256 res['odu-connection'][0])
1257 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0-service1-1GE'},
1258 res['odu-connection'][0]['destination'])
1259 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0-service1-1GE'},
1260 res['odu-connection'][0]['source'])
1262 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1263 response = test_utils.check_netconf_node_request(
1264 "SPDR-SC1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1265 self.assertEqual(response.status_code, requests.codes.ok)
1266 res = response.json()
1267 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1268 'administrative-state': 'inService',
1269 'supporting-circuit-pack-name': 'CP3-SFP1',
1270 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1271 'supporting-port': 'CP3-SFP1-P1'
1273 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1274 res['interface'][0])
1275 self.assertDictEqual(
1277 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1279 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1280 response = test_utils.check_netconf_node_request(
1281 "SPDR-SC1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1282 self.assertEqual(response.status_code, requests.codes.ok)
1283 res = response.json()
1284 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1285 'administrative-state': 'inService',
1286 'supporting-circuit-pack-name': 'CP3-SFP1',
1287 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1288 'type': 'org-openroadm-interfaces:otnOdu',
1289 'supporting-port': 'CP3-SFP1-P1'}
1291 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1292 'rate': 'org-openroadm-otn-common-types:ODU0',
1293 'monitoring-mode': 'terminated'}
1295 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1296 res['interface'][0])
1297 self.assertDictEqual(dict(input_dict_2,
1298 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1299 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1300 self.assertDictEqual(
1301 {'payload-type': '07', 'exp-payload-type': '07'},
1302 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1304 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1305 response = test_utils.check_netconf_node_request(
1306 "SPDR-SC1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1307 self.assertEqual(response.status_code, requests.codes.ok)
1308 res = response.json()
1309 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1310 'administrative-state': 'inService',
1311 'supporting-circuit-pack-name': 'CP3-CFP0',
1312 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1313 'type': 'org-openroadm-interfaces:otnOdu',
1314 'supporting-port': 'CP3-CFP0-P1'}
1316 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1317 'rate': 'org-openroadm-otn-common-types:ODU0',
1318 'monitoring-mode': 'monitored'}
1320 input_dict_3 = {'trib-port-number': 1}
1322 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1323 res['interface'][0])
1324 self.assertDictEqual(dict(input_dict_2,
1325 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1326 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1327 self.assertDictEqual(dict(input_dict_3,
1328 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1329 'parent-odu-allocation']),
1330 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1331 'parent-odu-allocation'])
1333 res['interface'][0][
1334 'org-openroadm-otn-odu-interfaces:odu'][
1335 'parent-odu-allocation']['trib-slots'])
1337 def test_79_check_ODU0_connection_spdrc(self):
1338 response = test_utils.check_netconf_node_request(
1340 "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1341 self.assertEqual(response.status_code, requests.codes.ok)
1342 res = response.json()
1345 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1346 'direction': 'bidirectional'
1349 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1350 res['odu-connection'][0])
1351 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0-service1-1GE'},
1352 res['odu-connection'][0]['destination'])
1353 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0-service1-1GE'},
1354 res['odu-connection'][0]['source'])
1356 def test_80_check_otn_topo_links(self):
1357 response = test_utils.get_otn_topo_request()
1358 self.assertEqual(response.status_code, requests.codes.ok)
1359 res = response.json()
1360 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1361 self.assertEqual(nb_links, 4)
1362 for link in res['network'][0]['ietf-network-topology:link']:
1363 linkId = link['link-id']
1364 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1365 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1367 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1369 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1371 def test_81_check_otn_topo_tp(self):
1372 response = test_utils.get_otn_topo_request()
1373 res = response.json()
1374 for node in res['network'][0]['node']:
1375 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1376 tpList = node['ietf-network-topology:termination-point']
1378 if tp['tp-id'] == 'XPDR3-NETWORK1':
1379 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1380 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1381 tsPoolList = list(range(1, 2))
1383 tsPoolList, xpdrTpPortConAt['ts-pool'])
1385 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1387 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1389 def test_82_delete_1GE_service(self):
1390 response = test_utils.service_delete_request("service1-1GE")
1391 self.assertEqual(response.status_code, requests.codes.ok)
1392 res = response.json()
1393 self.assertIn('Renderer service delete in progress',
1394 res['output']['configuration-response-common']['response-message'])
1395 time.sleep(self.WAITING)
1397 def test_83_check_service_list(self):
1398 response = test_utils.get_service_list_request("")
1399 self.assertEqual(response.status_code, requests.codes.ok)
1400 res = response.json()
1401 self.assertEqual(len(res['service-list']['services']), 2)
1404 def test_84_check_no_ODU0_connection_spdra(self):
1405 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1406 self.assertEqual(response.status_code, requests.codes.ok)
1407 res = response.json()
1408 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1411 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1412 response = test_utils.check_netconf_node_request(
1413 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1")
1414 self.assertEqual(response.status_code, requests.codes.conflict)
1416 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1417 response = test_utils.check_netconf_node_request(
1418 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1")
1419 self.assertEqual(response.status_code, requests.codes.conflict)
1421 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1422 response = test_utils.check_netconf_node_request(
1423 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1424 self.assertEqual(response.status_code, requests.codes.conflict)
1426 def test_88_check_otn_topo_links(self):
1427 response = test_utils.get_otn_topo_request()
1428 self.assertEqual(response.status_code, requests.codes.ok)
1429 res = response.json()
1430 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1431 self.assertEqual(nb_links, 4)
1432 for link in res['network'][0]['ietf-network-topology:link']:
1433 linkId = link['link-id']
1434 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1435 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1437 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1439 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1441 def test_89_check_otn_topo_tp(self):
1442 response = test_utils.get_otn_topo_request()
1443 res = response.json()
1444 for node in res['network'][0]['node']:
1445 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1446 tpList = node['ietf-network-topology:termination-point']
1448 if tp['tp-id'] == 'XPDR3-NETWORK1':
1449 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1450 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1452 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1454 def test_90_delete_ODU4_service(self):
1455 response = test_utils.service_delete_request("service2-ODU4")
1456 self.assertEqual(response.status_code, requests.codes.ok)
1457 res = response.json()
1458 self.assertIn('Renderer service delete in progress',
1459 res['output']['configuration-response-common']['response-message'])
1460 time.sleep(self.WAITING)
1462 def test_91_delete_OCH_OTU4_service(self):
1463 response = test_utils.service_delete_request("service2-OCH-OTU4")
1464 self.assertEqual(response.status_code, requests.codes.ok)
1465 res = response.json()
1466 self.assertIn('Renderer service delete in progress',
1467 res['output']['configuration-response-common']['response-message'])
1468 time.sleep(self.WAITING)
1470 def test_92_disconnect_xponders_from_roadm(self):
1471 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1472 response = test_utils.get_ordm_topo_request("")
1473 self.assertEqual(response.status_code, requests.codes.ok)
1474 res = response.json()
1475 links = res['network'][0]['ietf-network-topology:link']
1477 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1478 link_name = link["link-id"]
1479 response = test_utils.delete_request(url+link_name)
1480 self.assertEqual(response.status_code, requests.codes.ok)
1482 def test_93_check_openroadm_topology(self):
1483 response = test_utils.get_ordm_topo_request("")
1484 self.assertEqual(response.status_code, requests.codes.ok)
1485 res = response.json()
1486 links = res['network'][0]['ietf-network-topology:link']
1487 self.assertEqual(18, len(links), 'Topology should contain 18 links')
1489 def test_94_disconnect_spdrA(self):
1490 response = test_utils.unmount_device("SPDR-SA1")
1491 self.assertEqual(response.status_code, requests.codes.ok,
1492 test_utils.CODE_SHOULD_BE_200)
1494 def test_95_disconnect_spdrC(self):
1495 response = test_utils.unmount_device("SPDR-SC1")
1496 self.assertEqual(response.status_code, requests.codes.ok,
1497 test_utils.CODE_SHOULD_BE_200)
1499 def test_96_disconnect_roadmA(self):
1500 response = test_utils.unmount_device("ROADM-A1")
1501 self.assertEqual(response.status_code, requests.codes.ok,
1502 test_utils.CODE_SHOULD_BE_200)
1504 def test_97_disconnect_roadmC(self):
1505 response = test_utils.unmount_device("ROADM-C1")
1506 self.assertEqual(response.status_code, requests.codes.ok,
1507 test_utils.CODE_SHOULD_BE_200)
1510 if __name__ == "__main__":
1511 unittest.main(verbosity=2)