3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
19 from common import test_utils
22 class TransportPCEtesting(unittest.TestCase):
25 WAITING = 20 # nominal value is 300
27 cr_serv_sample_data = {"input": {
28 "sdnc-request-header": {
29 "request-id": "request-1",
30 "rpc-action": "service-create",
31 "request-system-id": "appname"
33 "service-name": "service1-OCH-OTU4",
34 "common-id": "commonId",
35 "connection-type": "infrastructure",
37 "service-rate": "100",
38 "node-id": "SPDR-SA1",
39 "service-format": "OTU",
40 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
44 "committed-info-rate": "100000",
45 "committed-burst-size": "64"
50 "port-device-name": "SPDR-SA1-XPDR1",
52 "port-name": "XPDR1-NETWORK1",
53 "port-rack": "000000.00",
54 "port-shelf": "Chassis#1"
57 "lgx-device-name": "Some lgx-device-name",
58 "lgx-port-name": "Some lgx-port-name",
59 "lgx-port-rack": "000000.00",
60 "lgx-port-shelf": "00"
65 "port-device-name": "SPDR-SA1-XPDR1",
67 "port-name": "XPDR1-NETWORK1",
68 "port-rack": "000000.00",
69 "port-shelf": "Chassis#1"
72 "lgx-device-name": "Some lgx-device-name",
73 "lgx-port-name": "Some lgx-port-name",
74 "lgx-port-rack": "000000.00",
75 "lgx-port-shelf": "00"
81 "service-rate": "100",
82 "node-id": "SPDR-SC1",
83 "service-format": "OTU",
84 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
88 "committed-info-rate": "100000",
89 "committed-burst-size": "64"
94 "port-device-name": "SPDR-SC1-XPDR1",
96 "port-name": "XPDR1-NETWORK1",
97 "port-rack": "000000.00",
98 "port-shelf": "Chassis#1"
101 "lgx-device-name": "Some lgx-device-name",
102 "lgx-port-name": "Some lgx-port-name",
103 "lgx-port-rack": "000000.00",
104 "lgx-port-shelf": "00"
109 "port-device-name": "SPDR-SC1-XPDR1",
110 "port-type": "fixed",
111 "port-name": "XPDR1-NETWORK1",
112 "port-rack": "000000.00",
113 "port-shelf": "Chassis#1"
116 "lgx-device-name": "Some lgx-device-name",
117 "lgx-port-name": "Some lgx-port-name",
118 "lgx-port-rack": "000000.00",
119 "lgx-port-shelf": "00"
124 "due-date": "2018-06-15T00:00:01Z",
125 "operator-contact": "pw1234"
131 cls.processes = test_utils.start_tpce()
132 cls.processes = test_utils.start_sims(
133 ['spdra', 'roadma', 'roadmc', 'spdrc'])
136 def tearDownClass(cls):
137 # pylint: disable=not-an-iterable
138 for process in cls.processes:
139 test_utils.shutdown_process(process)
140 print("all processes killed")
145 def test_01_connect_spdrA(self):
146 response = test_utils.mount_device("SPDR-SA1", 'spdra')
147 self.assertEqual(response.status_code,
148 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
150 def test_02_connect_spdrC(self):
151 response = test_utils.mount_device("SPDR-SC1", 'spdrc')
152 self.assertEqual(response.status_code,
153 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_03_connect_rdmA(self):
156 response = test_utils.mount_device("ROADM-A1", 'roadma')
157 self.assertEqual(response.status_code,
158 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160 def test_04_connect_rdmC(self):
161 response = test_utils.mount_device("ROADM-C1", 'roadmc')
162 self.assertEqual(response.status_code,
163 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
166 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
167 "ROADM-A1", "1", "SRG1-PP1-TXRX")
168 self.assertEqual(response.status_code, requests.codes.ok)
169 res = response.json()
170 self.assertIn('Xponder Roadm Link created successfully',
171 res["output"]["result"])
174 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
175 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
176 "ROADM-A1", "1", "SRG1-PP1-TXRX")
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
179 self.assertIn('Roadm Xponder links created successfully',
180 res["output"]["result"])
183 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
184 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
185 "ROADM-C1", "1", "SRG1-PP1-TXRX")
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
188 self.assertIn('Xponder Roadm Link created successfully',
189 res["output"]["result"])
192 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
193 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
194 "ROADM-C1", "1", "SRG1-PP1-TXRX")
195 self.assertEqual(response.status_code, requests.codes.ok)
196 res = response.json()
197 self.assertIn('Roadm Xponder links created successfully',
198 res["output"]["result"])
201 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
202 # Config ROADMA-ROADMC oms-attributes
204 "auto-spanloss": "true",
205 "spanloss-base": 11.4,
206 "spanloss-current": 12,
207 "engineered-spanloss": 12.2,
208 "link-concatenation": [{
211 "SRLG-length": 100000,
213 response = test_utils.add_oms_attr_request(
214 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
215 self.assertEqual(response.status_code, requests.codes.created)
217 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
218 # Config ROADMC-ROADMA oms-attributes
220 "auto-spanloss": "true",
221 "spanloss-base": 11.4,
222 "spanloss-current": 12,
223 "engineered-spanloss": 12.2,
224 "link-concatenation": [{
227 "SRLG-length": 100000,
229 response = test_utils.add_oms_attr_request(
230 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
231 self.assertEqual(response.status_code, requests.codes.created)
233 # test service-create for OCH-OTU4 service from spdr to spdr
234 def test_11_check_otn_topology(self):
235 response = test_utils.get_otn_topo_request()
236 self.assertEqual(response.status_code, requests.codes.ok)
237 res = response.json()
238 nbNode = len(res['network'][0]['node'])
239 self.assertEqual(nbNode, 6, 'There should be 6 nodes')
240 self.assertNotIn('ietf-network-topology:link', res['network'][0])
242 def test_12_create_OCH_OTU4_service(self):
243 response = test_utils.service_create_request(self.cr_serv_sample_data)
244 self.assertEqual(response.status_code, requests.codes.ok)
245 res = response.json()
246 self.assertIn('PCE calculation in progress',
247 res['output']['configuration-response-common']['response-message'])
248 time.sleep(self.WAITING)
250 def test_13_get_OCH_OTU4_service1(self):
251 response = test_utils.get_service_list_request(
252 "services/service1-OCH-OTU4")
253 self.assertEqual(response.status_code, requests.codes.ok)
254 res = response.json()
256 res['services'][0]['administrative-state'], 'inService')
258 res['services'][0]['service-name'], 'service1-OCH-OTU4')
260 res['services'][0]['connection-type'], 'infrastructure')
262 res['services'][0]['lifecycle-state'], 'planned')
265 # Check correct configuration of devices
266 def test_14_check_interface_och_spdra(self):
267 response = test_utils.check_netconf_node_request(
268 "SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
269 self.assertEqual(response.status_code, requests.codes.ok)
270 res = response.json()
271 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
272 'administrative-state': 'inService',
273 'supporting-circuit-pack-name': 'CP1-CFP0',
274 'type': 'org-openroadm-interfaces:opticalChannel',
275 'supporting-port': 'CP1-CFP0-P1'
276 }, **res['interface'][0]),
279 self.assertDictEqual(
280 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
281 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
282 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
284 def test_15_check_interface_OTU4_spdra(self):
285 response = test_utils.check_netconf_node_request(
286 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
287 self.assertEqual(response.status_code, requests.codes.ok)
288 res = response.json()
289 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
290 'administrative-state': 'inService',
291 'supporting-circuit-pack-name': 'CP1-CFP0',
292 'supporting-interface': 'XPDR1-NETWORK1-761:768',
293 'type': 'org-openroadm-interfaces:otnOtu',
294 'supporting-port': 'CP1-CFP0-P1'
296 input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
297 'expected-dapi': 'Swfw02qXGyI=',
298 'rate': 'org-openroadm-otn-common-types:OTU4',
301 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
304 self.assertDictEqual(input_dict_2,
306 ['org-openroadm-otn-otu-interfaces:otu'])
308 def test_16_check_interface_och_spdrc(self):
309 response = test_utils.check_netconf_node_request(
310 "SPDR-SC1", "interface/XPDR1-NETWORK1-761:768")
311 self.assertEqual(response.status_code, requests.codes.ok)
312 res = response.json()
313 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
314 'administrative-state': 'inService',
315 'supporting-circuit-pack-name': 'CP1-CFP0',
316 'type': 'org-openroadm-interfaces:opticalChannel',
317 'supporting-port': 'CP1-CFP0-P1'
318 }, **res['interface'][0]),
321 self.assertDictEqual(
322 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
323 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
324 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
326 def test_17_check_interface_OTU4_spdrc(self):
327 response = test_utils.check_netconf_node_request(
328 "SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
329 self.assertEqual(response.status_code, requests.codes.ok)
330 res = response.json()
331 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
332 'administrative-state': 'inService',
333 'supporting-circuit-pack-name': 'CP1-CFP0',
334 'supporting-interface': 'XPDR1-NETWORK1-761:768',
335 'type': 'org-openroadm-interfaces:otnOtu',
336 'supporting-port': 'CP1-CFP0-P1'
338 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
339 'expected-sapi': 'Swfw02qXGyI=',
340 'tx-sapi': 'fuYZwEO660g=',
341 'expected-dapi': 'fuYZwEO660g=',
342 'rate': 'org-openroadm-otn-common-types:OTU4',
346 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
349 self.assertDictEqual(input_dict_2,
351 ['org-openroadm-otn-otu-interfaces:otu'])
353 def test_18_check_no_interface_ODU4_spdra(self):
354 response = test_utils.check_netconf_node_request(
355 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
356 self.assertEqual(response.status_code, requests.codes.conflict)
357 res = response.json()
359 {"error-type": "application", "error-tag": "data-missing",
360 "error-message": "Request could not be completed because the relevant data model content does not exist"},
361 res['errors']['error'])
363 def test_19_check_openroadm_topo_spdra(self):
364 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
365 self.assertEqual(response.status_code, requests.codes.ok)
366 res = response.json()
367 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
368 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
369 self.assertEqual({u'frequency': 196.1,
371 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
374 def test_20_check_openroadm_topo_ROADMA_SRG(self):
375 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
376 self.assertEqual(response.status_code, requests.codes.ok)
377 res = response.json()
378 freq_map = base64.b64decode(
379 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
380 freq_map_array = [int(x) for x in freq_map]
381 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
382 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
384 if ele['tp-id'] == 'SRG1-PP1-TXRX':
385 freq_map = base64.b64decode(
386 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
387 freq_map_array = [int(x) for x in freq_map]
388 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
389 if ele['tp-id'] == 'SRG1-PP2-TXRX':
390 self.assertNotIn('avail-freq-maps', dict.keys(ele))
393 def test_21_check_openroadm_topo_ROADMA_DEG(self):
394 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
395 self.assertEqual(response.status_code, requests.codes.ok)
396 res = response.json()
397 freq_map = base64.b64decode(
398 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
399 freq_map_array = [int(x) for x in freq_map]
400 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
401 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
403 if ele['tp-id'] == 'DEG2-CTP-TXRX':
404 freq_map = base64.b64decode(
405 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
406 freq_map_array = [int(x) for x in freq_map]
407 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
408 if ele['tp-id'] == 'DEG2-TTP-TXRX':
409 freq_map = base64.b64decode(
410 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
411 freq_map_array = [int(x) for x in freq_map]
412 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
415 def test_22_check_otn_topo_otu4_links(self):
416 response = test_utils.get_otn_topo_request()
417 self.assertEqual(response.status_code, requests.codes.ok)
418 res = response.json()
419 nb_links = len(res['network'][0]['ietf-network-topology:link'])
420 self.assertEqual(nb_links, 2)
421 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
422 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
423 for link in res['network'][0]['ietf-network-topology:link']:
424 self.assertIn(link['link-id'], listLinkId)
426 link['transportpce-topology:otn-link-type'], 'OTU4')
428 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
430 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
432 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
434 link['org-openroadm-common-network:opposite-link'], listLinkId)
436 # test service-create for ODU4 service from spdr to spdr
437 def test_23_create_ODU4_service(self):
438 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
439 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
440 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
441 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
442 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
443 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
444 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
446 response = test_utils.service_create_request(self.cr_serv_sample_data)
447 self.assertEqual(response.status_code, requests.codes.ok)
448 res = response.json()
449 self.assertIn('PCE calculation in progress',
450 res['output']['configuration-response-common']['response-message'])
451 time.sleep(self.WAITING)
453 def test_24_get_ODU4_service1(self):
454 response = test_utils.get_service_list_request(
455 "services/service1-ODU4")
456 self.assertEqual(response.status_code, requests.codes.ok)
457 res = response.json()
459 res['services'][0]['administrative-state'], 'inService')
461 res['services'][0]['service-name'], 'service1-ODU4')
463 res['services'][0]['connection-type'], 'infrastructure')
465 res['services'][0]['lifecycle-state'], 'planned')
468 def test_25_check_interface_ODU4_spdra(self):
469 response = test_utils.check_netconf_node_request(
470 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
471 self.assertEqual(response.status_code, requests.codes.ok)
472 res = response.json()
473 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
474 'administrative-state': 'inService',
475 'supporting-circuit-pack-name': 'CP1-CFP0',
476 'supporting-interface': 'XPDR1-NETWORK1-OTU',
477 'type': 'org-openroadm-interfaces:otnOdu',
478 'supporting-port': 'CP1-CFP0-P1'}
479 # SAPI/DAPI are added in the Otu4 renderer
480 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
481 'rate': 'org-openroadm-otn-common-types:ODU4',
482 'expected-dapi': 'Swfw02qXGyI=',
483 'expected-sapi': 'fuYZwEO660g=',
484 'tx-dapi': 'fuYZwEO660g=',
485 'tx-sapi': 'Swfw02qXGyI='}
487 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
489 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
491 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
493 self.assertDictEqual(
494 {u'payload-type': u'21', u'exp-payload-type': u'21'},
495 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
497 def test_26_check_interface_ODU4_spdrc(self):
498 response = test_utils.check_netconf_node_request(
499 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
500 self.assertEqual(response.status_code, requests.codes.ok)
501 res = response.json()
502 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
503 'administrative-state': 'inService',
504 'supporting-circuit-pack-name': 'CP1-CFP0',
505 'supporting-interface': 'XPDR1-NETWORK1-OTU',
506 'type': 'org-openroadm-interfaces:otnOdu',
507 'supporting-port': 'CP1-CFP0-P1'}
508 # SAPI/DAPI are added in the Otu4 renderer
509 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
510 'rate': 'org-openroadm-otn-common-types:ODU4',
511 'tx-sapi': 'fuYZwEO660g=',
512 'tx-dapi': 'Swfw02qXGyI=',
513 'expected-sapi': 'Swfw02qXGyI=',
514 'expected-dapi': 'fuYZwEO660g='
516 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
518 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
520 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
522 self.assertDictEqual(
523 {u'payload-type': u'21', u'exp-payload-type': u'21'},
524 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
526 def test_27_check_otn_topo_links(self):
527 response = test_utils.get_otn_topo_request()
528 self.assertEqual(response.status_code, requests.codes.ok)
529 res = response.json()
530 nb_links = len(res['network'][0]['ietf-network-topology:link'])
531 self.assertEqual(nb_links, 4)
532 for link in res['network'][0]['ietf-network-topology:link']:
533 linkId = link['link-id']
534 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
535 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
537 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
539 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
540 elif (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
541 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
543 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
545 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
547 link['transportpce-topology:otn-link-type'], 'ODTU4')
549 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
550 self.assertIn(link['org-openroadm-common-network:opposite-link'],
551 ['ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
552 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
554 self.fail("this link should not exist")
556 def test_28_check_otn_topo_tp(self):
557 response = test_utils.get_otn_topo_request()
558 res = response.json()
559 for node in res['network'][0]['node']:
560 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
561 tpList = node['ietf-network-topology:termination-point']
563 if tp['tp-id'] == 'XPDR1-NETWORK1':
564 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
565 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
567 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
568 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
569 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
571 # test service-create for 10GE service from spdr to spdr
572 def test_29_create_10GE_service(self):
573 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
574 self.cr_serv_sample_data["input"]["connection-type"] = "service"
575 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
576 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
577 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
578 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
579 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
580 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
581 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
582 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
583 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
584 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
585 response = test_utils.service_create_request(self.cr_serv_sample_data)
586 self.assertEqual(response.status_code, requests.codes.ok)
587 res = response.json()
588 self.assertIn('PCE calculation in progress',
589 res['output']['configuration-response-common']['response-message'])
590 time.sleep(self.WAITING)
592 def test_30_get_10GE_service1(self):
593 response = test_utils.get_service_list_request(
594 "services/service1-10GE")
595 self.assertEqual(response.status_code, requests.codes.ok)
596 res = response.json()
598 res['services'][0]['administrative-state'], 'inService')
600 res['services'][0]['service-name'], 'service1-10GE')
602 res['services'][0]['connection-type'], 'service')
604 res['services'][0]['lifecycle-state'], 'planned')
607 def test_31_check_interface_10GE_CLIENT_spdra(self):
608 response = test_utils.check_netconf_node_request(
609 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
610 self.assertEqual(response.status_code, requests.codes.ok)
611 res = response.json()
612 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
613 'administrative-state': 'inService',
614 'supporting-circuit-pack-name': 'CP1-SFP4',
615 'type': 'org-openroadm-interfaces:ethernetCsmacd',
616 'supporting-port': 'CP1-SFP4-P1'
618 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
620 self.assertDictEqual(
622 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
624 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
625 response = test_utils.check_netconf_node_request(
626 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
627 self.assertEqual(response.status_code, requests.codes.ok)
628 res = response.json()
629 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
630 'administrative-state': 'inService',
631 'supporting-circuit-pack-name': 'CP1-SFP4',
632 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
633 'type': 'org-openroadm-interfaces:otnOdu',
634 'supporting-port': 'CP1-SFP4-P1'}
636 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
637 'rate': 'org-openroadm-otn-common-types:ODU2e',
638 'monitoring-mode': 'terminated'}
640 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
642 self.assertDictEqual(dict(input_dict_2,
643 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
644 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
645 self.assertDictEqual(
646 {u'payload-type': u'03', u'exp-payload-type': u'03'},
647 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
649 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
650 response = test_utils.check_netconf_node_request(
651 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
652 self.assertEqual(response.status_code, requests.codes.ok)
653 res = response.json()
654 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
655 'administrative-state': 'inService',
656 'supporting-circuit-pack-name': 'CP1-CFP0',
657 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
658 'type': 'org-openroadm-interfaces:otnOdu',
659 'supporting-port': 'CP1-CFP0-P1'}
661 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
662 'rate': 'org-openroadm-otn-common-types:ODU2e',
663 'monitoring-mode': 'monitored'}
664 input_dict_3 = {'trib-port-number': 1}
666 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
668 self.assertDictEqual(dict(input_dict_2,
669 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
670 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
671 self.assertDictEqual(dict(input_dict_3,
672 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
673 'parent-odu-allocation']),
674 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
675 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
678 def test_34_check_ODU2E_connection_spdra(self):
679 response = test_utils.check_netconf_node_request(
681 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
682 self.assertEqual(response.status_code, requests.codes.ok)
683 res = response.json()
686 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
687 'direction': 'bidirectional'
690 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
691 res['odu-connection'][0])
692 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
693 res['odu-connection'][0]['destination'])
694 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
695 res['odu-connection'][0]['source'])
697 def test_35_check_interface_10GE_CLIENT_spdrc(self):
698 response = test_utils.check_netconf_node_request(
699 "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
700 self.assertEqual(response.status_code, requests.codes.ok)
701 res = response.json()
702 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
703 'administrative-state': 'inService',
704 'supporting-circuit-pack-name': 'CP1-SFP4',
705 'type': 'org-openroadm-interfaces:ethernetCsmacd',
706 'supporting-port': 'CP1-SFP4-P1'
708 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
710 self.assertDictEqual(
712 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
714 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
715 response = test_utils.check_netconf_node_request(
716 "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
717 self.assertEqual(response.status_code, requests.codes.ok)
718 res = response.json()
719 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
720 'administrative-state': 'inService',
721 'supporting-circuit-pack-name': 'CP1-SFP4',
722 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
723 'type': 'org-openroadm-interfaces:otnOdu',
724 'supporting-port': 'CP1-SFP4-P1'}
726 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
727 'rate': 'org-openroadm-otn-common-types:ODU2e',
728 'monitoring-mode': 'terminated'}
730 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
732 self.assertDictEqual(dict(input_dict_2,
733 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
734 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
735 self.assertDictEqual(
736 {u'payload-type': u'03', u'exp-payload-type': u'03'},
737 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
739 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
740 response = test_utils.check_netconf_node_request(
741 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
742 self.assertEqual(response.status_code, requests.codes.ok)
743 res = response.json()
744 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
745 'administrative-state': 'inService',
746 'supporting-circuit-pack-name': 'CP1-CFP0',
747 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
748 'type': 'org-openroadm-interfaces:otnOdu',
749 'supporting-port': 'CP1-CFP0-P1'}
751 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
752 'rate': 'org-openroadm-otn-common-types:ODU2e',
753 'monitoring-mode': 'monitored'}
755 input_dict_3 = {'trib-port-number': 1}
757 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
759 self.assertDictEqual(dict(input_dict_2,
760 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
761 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
762 self.assertDictEqual(dict(input_dict_3,
763 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
764 'parent-odu-allocation']),
765 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
766 'parent-odu-allocation'])
769 'org-openroadm-otn-odu-interfaces:odu'][
770 'parent-odu-allocation']['trib-slots'])
772 def test_38_check_ODU2E_connection_spdrc(self):
773 response = test_utils.check_netconf_node_request(
775 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
776 self.assertEqual(response.status_code, requests.codes.ok)
777 res = response.json()
780 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
781 'direction': 'bidirectional'
784 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
785 res['odu-connection'][0])
786 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
787 res['odu-connection'][0]['destination'])
788 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
789 res['odu-connection'][0]['source'])
791 def test_39_check_otn_topo_links(self):
792 response = test_utils.get_otn_topo_request()
793 self.assertEqual(response.status_code, requests.codes.ok)
794 res = response.json()
795 nb_links = len(res['network'][0]['ietf-network-topology:link'])
796 self.assertEqual(nb_links, 4)
797 for link in res['network'][0]['ietf-network-topology:link']:
798 linkId = link['link-id']
799 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
800 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
802 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
804 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
806 def test_40_check_otn_topo_tp(self):
807 response = test_utils.get_otn_topo_request()
808 res = response.json()
809 for node in res['network'][0]['node']:
810 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
811 tpList = node['ietf-network-topology:termination-point']
813 if tp['tp-id'] == 'XPDR1-NETWORK1':
814 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
815 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
816 tsPoolList = list(range(1, 9))
818 tsPoolList, xpdrTpPortConAt['ts-pool'])
820 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
822 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
824 def test_41_delete_10GE_service(self):
825 response = test_utils.service_delete_request("service1-10GE")
826 self.assertEqual(response.status_code, requests.codes.ok)
827 res = response.json()
828 self.assertIn('Renderer service delete in progress',
829 res['output']['configuration-response-common']['response-message'])
830 time.sleep(self.WAITING)
832 def test_42_check_service_list(self):
833 response = test_utils.get_service_list_request("")
834 self.assertEqual(response.status_code, requests.codes.ok)
835 res = response.json()
836 self.assertEqual(len(res['service-list']['services']), 2)
839 def test_43_check_no_ODU2e_connection_spdra(self):
840 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
841 self.assertEqual(response.status_code, requests.codes.ok)
842 res = response.json()
843 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
846 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
847 response = test_utils.check_netconf_node_request(
848 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
849 self.assertEqual(response.status_code, requests.codes.conflict)
851 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
852 response = test_utils.check_netconf_node_request(
853 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
854 self.assertEqual(response.status_code, requests.codes.conflict)
856 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
857 response = test_utils.check_netconf_node_request(
858 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
859 self.assertEqual(response.status_code, requests.codes.conflict)
861 def test_47_check_otn_topo_links(self):
862 response = test_utils.get_otn_topo_request()
863 self.assertEqual(response.status_code, requests.codes.ok)
864 res = response.json()
865 nb_links = len(res['network'][0]['ietf-network-topology:link'])
866 self.assertEqual(nb_links, 4)
867 for link in res['network'][0]['ietf-network-topology:link']:
868 linkId = link['link-id']
869 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
870 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
872 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
874 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
876 def test_48_check_otn_topo_tp(self):
877 response = test_utils.get_otn_topo_request()
878 res = response.json()
879 for node in res['network'][0]['node']:
880 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
881 tpList = node['ietf-network-topology:termination-point']
883 if tp['tp-id'] == 'XPDR1-NETWORK1':
884 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
885 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
887 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
889 def test_49_delete_ODU4_service(self):
890 response = test_utils.service_delete_request("service1-ODU4")
891 self.assertEqual(response.status_code, requests.codes.ok)
892 res = response.json()
893 self.assertIn('Renderer service delete in progress',
894 res['output']['configuration-response-common']['response-message'])
895 time.sleep(self.WAITING)
897 def test_50_check_service_list(self):
898 response = test_utils.get_service_list_request("")
899 self.assertEqual(response.status_code, requests.codes.ok)
900 res = response.json()
901 self.assertEqual(len(res['service-list']['services']), 1)
904 def test_51_check_no_interface_ODU4_spdra(self):
905 response = test_utils.check_netconf_node_request(
906 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
907 self.assertEqual(response.status_code, requests.codes.conflict)
909 def test_52_check_otn_topo_links(self):
910 self.test_22_check_otn_topo_otu4_links()
912 def test_53_check_otn_topo_tp(self):
913 response = test_utils.get_otn_topo_request()
914 res = response.json()
915 for node in res['network'][0]['node']:
916 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
917 tpList = node['ietf-network-topology:termination-point']
919 if tp['tp-id'] == 'XPDR1-NETWORK1':
920 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
921 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
923 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
925 def test_54_delete_OCH_OTU4_service(self):
926 response = test_utils.service_delete_request("service1-OCH-OTU4")
927 self.assertEqual(response.status_code, requests.codes.ok)
928 res = response.json()
929 self.assertIn('Renderer service delete in progress',
930 res['output']['configuration-response-common']['response-message'])
931 time.sleep(self.WAITING)
933 def test_55_get_no_service(self):
934 response = test_utils.get_service_list_request("")
935 self.assertEqual(response.status_code, requests.codes.conflict)
936 res = response.json()
938 {"error-type": "application", "error-tag": "data-missing",
939 "error-message": "Request could not be completed because the relevant data model content does not exist"},
940 res['errors']['error'])
943 def test_56_check_no_interface_OTU4_spdra(self):
944 response = test_utils.check_netconf_node_request(
945 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
946 self.assertEqual(response.status_code, requests.codes.conflict)
948 def test_57_check_no_interface_OCH_spdra(self):
949 response = test_utils.check_netconf_node_request(
950 "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
951 self.assertEqual(response.status_code, requests.codes.conflict)
953 def test_58_getLinks_OtnTopology(self):
954 response = test_utils.get_otn_topo_request()
955 self.assertEqual(response.status_code, requests.codes.ok)
956 res = response.json()
957 self.assertNotIn('ietf-network-topology:link', res['network'][0])
959 def test_59_check_openroadm_topo_spdra(self):
960 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
961 self.assertEqual(response.status_code, requests.codes.ok)
962 res = response.json()
963 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
964 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
965 self.assertNotIn('wavelength', dict.keys(
966 tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
969 def test_60_check_openroadm_topo_ROADMA_SRG(self):
970 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
971 self.assertEqual(response.status_code, requests.codes.ok)
972 res = response.json()
973 freq_map = base64.b64decode(
974 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
975 freq_map_array = [int(x) for x in freq_map]
976 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
977 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
979 if ele['tp-id'] == 'SRG1-PP1-TXRX':
980 freq_map = base64.b64decode(
981 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
982 freq_map_array = [int(x) for x in freq_map]
983 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
986 def test_61_check_openroadm_topo_ROADMA_DEG(self):
987 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
988 self.assertEqual(response.status_code, requests.codes.ok)
989 res = response.json()
990 freq_map = base64.b64decode(
991 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
992 freq_map_array = [int(x) for x in freq_map]
993 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
994 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
996 if ele['tp-id'] == 'DEG2-CTP-TXRX':
997 freq_map = base64.b64decode(
998 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
999 freq_map_array = [int(x) for x in freq_map]
1000 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1001 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1002 freq_map = base64.b64decode(
1003 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1004 freq_map_array = [int(x) for x in freq_map]
1005 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1008 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
1009 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "3", "1",
1010 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1011 self.assertEqual(response.status_code, requests.codes.ok)
1012 res = response.json()
1013 self.assertIn('Xponder Roadm Link created successfully',
1014 res["output"]["result"])
1017 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
1018 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "3", "1",
1019 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1020 self.assertEqual(response.status_code, requests.codes.ok)
1021 res = response.json()
1022 self.assertIn('Roadm Xponder links created successfully',
1023 res["output"]["result"])
1026 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1027 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "3", "1",
1028 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1029 self.assertEqual(response.status_code, requests.codes.ok)
1030 res = response.json()
1031 self.assertIn('Xponder Roadm Link created successfully',
1032 res["output"]["result"])
1035 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1036 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "3", "1",
1037 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1038 self.assertEqual(response.status_code, requests.codes.ok)
1039 res = response.json()
1040 self.assertIn('Roadm Xponder links created successfully',
1041 res["output"]["result"])
1044 def test_66_create_OCH_OTU4_service_2(self):
1045 self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1046 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1047 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1048 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1049 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1050 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1051 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1052 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1053 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1054 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1055 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1056 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1057 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1058 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1059 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1060 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1061 response = test_utils.service_create_request(self.cr_serv_sample_data)
1062 self.assertEqual(response.status_code, requests.codes.ok)
1063 res = response.json()
1064 self.assertIn('PCE calculation in progress',
1065 res['output']['configuration-response-common']['response-message'])
1066 time.sleep(self.WAITING)
1068 def test_67_get_OCH_OTU4_service2(self):
1069 response = test_utils.get_service_list_request(
1070 "services/service2-OCH-OTU4")
1071 self.assertEqual(response.status_code, requests.codes.ok)
1072 res = response.json()
1074 res['services'][0]['administrative-state'], 'inService')
1076 res['services'][0]['service-name'], 'service2-OCH-OTU4')
1078 res['services'][0]['connection-type'], 'infrastructure')
1080 res['services'][0]['lifecycle-state'], 'planned')
1083 def test_68_create_ODU4_service_2(self):
1084 self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1085 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1086 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1087 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1088 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1089 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1090 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1091 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1092 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1093 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1094 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1095 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1096 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1097 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1098 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1100 response = test_utils.service_create_request(self.cr_serv_sample_data)
1101 self.assertEqual(response.status_code, requests.codes.ok)
1102 res = response.json()
1103 self.assertIn('PCE calculation in progress',
1104 res['output']['configuration-response-common']['response-message'])
1105 time.sleep(self.WAITING)
1107 def test_69_get_ODU4_service2(self):
1108 response = test_utils.get_service_list_request(
1109 "services/service2-ODU4")
1110 self.assertEqual(response.status_code, requests.codes.ok)
1111 res = response.json()
1113 res['services'][0]['administrative-state'], 'inService')
1115 res['services'][0]['service-name'], 'service2-ODU4')
1117 res['services'][0]['connection-type'], 'infrastructure')
1119 res['services'][0]['lifecycle-state'], 'planned')
1122 def test_70_create_1GE_service(self):
1123 self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1124 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1125 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1126 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1127 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1128 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1129 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1130 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1131 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1132 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1133 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1134 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1135 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1136 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1137 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1138 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1139 response = test_utils.service_create_request(self.cr_serv_sample_data)
1140 self.assertEqual(response.status_code, requests.codes.ok)
1141 res = response.json()
1142 self.assertIn('PCE calculation in progress',
1143 res['output']['configuration-response-common']['response-message'])
1144 time.sleep(self.WAITING)
1146 def test_71_get_1GE_service1(self):
1147 response = test_utils.get_service_list_request("services/service1-1GE")
1148 self.assertEqual(response.status_code, requests.codes.ok)
1149 res = response.json()
1151 res['services'][0]['administrative-state'], 'inService')
1153 res['services'][0]['service-name'], 'service1-1GE')
1155 res['services'][0]['connection-type'], 'service')
1157 res['services'][0]['lifecycle-state'], 'planned')
1160 def test_72_check_interface_1GE_CLIENT_spdra(self):
1161 response = test_utils.check_netconf_node_request(
1162 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1163 self.assertEqual(response.status_code, requests.codes.ok)
1164 res = response.json()
1165 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1166 'administrative-state': 'inService',
1167 'supporting-circuit-pack-name': 'CP1-SFP4',
1168 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1169 'supporting-port': 'CP1-SFP4-P1'
1171 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1172 res['interface'][0])
1173 self.assertDictEqual(
1175 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1177 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1178 response = test_utils.check_netconf_node_request(
1179 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1180 self.assertEqual(response.status_code, requests.codes.ok)
1181 res = response.json()
1182 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1183 'administrative-state': 'inService',
1184 'supporting-circuit-pack-name': 'CP3-SFP1',
1185 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1186 'type': 'org-openroadm-interfaces:otnOdu',
1187 'supporting-port': 'CP3-SFP1-P1'}
1189 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1190 'rate': 'org-openroadm-otn-common-types:ODU0',
1191 'monitoring-mode': 'terminated'}
1193 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1194 res['interface'][0])
1195 self.assertDictEqual(dict(input_dict_2,
1196 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1197 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1198 self.assertDictEqual(
1199 {u'payload-type': u'07', u'exp-payload-type': u'07'},
1200 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1202 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1203 response = test_utils.check_netconf_node_request(
1204 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1205 self.assertEqual(response.status_code, requests.codes.ok)
1206 res = response.json()
1207 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1208 'administrative-state': 'inService',
1209 'supporting-circuit-pack-name': 'CP3-CFP0',
1210 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1211 'type': 'org-openroadm-interfaces:otnOdu',
1212 'supporting-port': 'CP3-CFP0-P1'}
1214 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1215 'rate': 'org-openroadm-otn-common-types:ODU0',
1216 'monitoring-mode': 'monitored'}
1217 input_dict_3 = {'trib-port-number': 1}
1219 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1220 res['interface'][0])
1221 self.assertDictEqual(dict(input_dict_2,
1222 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1223 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1224 self.assertDictEqual(dict(input_dict_3,
1225 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1226 'parent-odu-allocation']),
1227 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1228 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1231 def test_75_check_ODU0_connection_spdra(self):
1232 response = test_utils.check_netconf_node_request(
1234 "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1235 self.assertEqual(response.status_code, requests.codes.ok)
1236 res = response.json()
1239 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1240 'direction': 'bidirectional'
1243 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1244 res['odu-connection'][0])
1245 self.assertDictEqual({u'dst-if': u'XPDR3-NETWORK1-ODU0-service1-1GE'},
1246 res['odu-connection'][0]['destination'])
1247 self.assertDictEqual({u'src-if': u'XPDR3-CLIENT1-ODU0-service1-1GE'},
1248 res['odu-connection'][0]['source'])
1250 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1251 response = test_utils.check_netconf_node_request(
1252 "SPDR-SC1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1253 self.assertEqual(response.status_code, requests.codes.ok)
1254 res = response.json()
1255 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1256 'administrative-state': 'inService',
1257 'supporting-circuit-pack-name': 'CP3-SFP1',
1258 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1259 'supporting-port': 'CP3-SFP1-P1'
1261 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1262 res['interface'][0])
1263 self.assertDictEqual(
1265 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1267 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1268 response = test_utils.check_netconf_node_request(
1269 "SPDR-SC1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1270 self.assertEqual(response.status_code, requests.codes.ok)
1271 res = response.json()
1272 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1273 'administrative-state': 'inService',
1274 'supporting-circuit-pack-name': 'CP3-SFP1',
1275 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1276 'type': 'org-openroadm-interfaces:otnOdu',
1277 'supporting-port': 'CP3-SFP1-P1'}
1279 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1280 'rate': 'org-openroadm-otn-common-types:ODU0',
1281 'monitoring-mode': 'terminated'}
1283 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1284 res['interface'][0])
1285 self.assertDictEqual(dict(input_dict_2,
1286 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1287 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1288 self.assertDictEqual(
1289 {u'payload-type': u'07', u'exp-payload-type': u'07'},
1290 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1292 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1293 response = test_utils.check_netconf_node_request(
1294 "SPDR-SC1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1295 self.assertEqual(response.status_code, requests.codes.ok)
1296 res = response.json()
1297 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1298 'administrative-state': 'inService',
1299 'supporting-circuit-pack-name': 'CP3-CFP0',
1300 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1301 'type': 'org-openroadm-interfaces:otnOdu',
1302 'supporting-port': 'CP3-CFP0-P1'}
1304 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1305 'rate': 'org-openroadm-otn-common-types:ODU0',
1306 'monitoring-mode': 'monitored'}
1308 input_dict_3 = {'trib-port-number': 1}
1310 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1311 res['interface'][0])
1312 self.assertDictEqual(dict(input_dict_2,
1313 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1314 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1315 self.assertDictEqual(dict(input_dict_3,
1316 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1317 'parent-odu-allocation']),
1318 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1319 'parent-odu-allocation'])
1321 res['interface'][0][
1322 'org-openroadm-otn-odu-interfaces:odu'][
1323 'parent-odu-allocation']['trib-slots'])
1325 def test_79_check_ODU0_connection_spdrc(self):
1326 response = test_utils.check_netconf_node_request(
1328 "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1329 self.assertEqual(response.status_code, requests.codes.ok)
1330 res = response.json()
1333 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1334 'direction': 'bidirectional'
1337 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1338 res['odu-connection'][0])
1339 self.assertDictEqual({u'dst-if': u'XPDR3-NETWORK1-ODU0-service1-1GE'},
1340 res['odu-connection'][0]['destination'])
1341 self.assertDictEqual({u'src-if': u'XPDR3-CLIENT1-ODU0-service1-1GE'},
1342 res['odu-connection'][0]['source'])
1344 def test_80_check_otn_topo_links(self):
1345 response = test_utils.get_otn_topo_request()
1346 self.assertEqual(response.status_code, requests.codes.ok)
1347 res = response.json()
1348 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1349 self.assertEqual(nb_links, 4)
1350 for link in res['network'][0]['ietf-network-topology:link']:
1351 linkId = link['link-id']
1352 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1353 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1355 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1357 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1359 def test_81_check_otn_topo_tp(self):
1360 response = test_utils.get_otn_topo_request()
1361 res = response.json()
1362 for node in res['network'][0]['node']:
1363 if node['node-id'] == 'SPDR-SA1-XPDR3' or 'SPDR-SC1-XPDR3':
1364 tpList = node['ietf-network-topology:termination-point']
1366 if tp['tp-id'] == 'XPDR3-NETWORK1':
1367 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1368 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1369 tsPoolList = list(range(1, 2))
1371 tsPoolList, xpdrTpPortConAt['ts-pool'])
1373 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1375 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1377 def test_82_delete_1GE_service(self):
1378 response = test_utils.service_delete_request("service1-1GE")
1379 self.assertEqual(response.status_code, requests.codes.ok)
1380 res = response.json()
1381 self.assertIn('Renderer service delete in progress',
1382 res['output']['configuration-response-common']['response-message'])
1383 time.sleep(self.WAITING)
1385 def test_83_check_service_list(self):
1386 response = test_utils.get_service_list_request("")
1387 self.assertEqual(response.status_code, requests.codes.ok)
1388 res = response.json()
1389 self.assertEqual(len(res['service-list']['services']), 2)
1392 def test_84_check_no_ODU0_connection_spdra(self):
1393 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1394 self.assertEqual(response.status_code, requests.codes.ok)
1395 res = response.json()
1396 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1399 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1400 response = test_utils.check_netconf_node_request(
1401 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1")
1402 self.assertEqual(response.status_code, requests.codes.conflict)
1404 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1405 response = test_utils.check_netconf_node_request(
1406 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1")
1407 self.assertEqual(response.status_code, requests.codes.conflict)
1409 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1410 response = test_utils.check_netconf_node_request(
1411 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1412 self.assertEqual(response.status_code, requests.codes.conflict)
1414 def test_88_check_otn_topo_links(self):
1415 response = test_utils.get_otn_topo_request()
1416 self.assertEqual(response.status_code, requests.codes.ok)
1417 res = response.json()
1418 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1419 self.assertEqual(nb_links, 4)
1420 for link in res['network'][0]['ietf-network-topology:link']:
1421 linkId = link['link-id']
1422 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1423 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1425 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1427 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1429 def test_89_check_otn_topo_tp(self):
1430 response = test_utils.get_otn_topo_request()
1431 res = response.json()
1432 for node in res['network'][0]['node']:
1433 if (node['node-id'] == 'SPDR-SA1-XPDR3' or 'SPDR-SC1-XPDR3'):
1434 tpList = node['ietf-network-topology:termination-point']
1436 if tp['tp-id'] == 'XPDR3-NETWORK1':
1437 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1438 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1440 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1442 def test_90_delete_ODU4_service(self):
1443 response = test_utils.service_delete_request("service2-ODU4")
1444 self.assertEqual(response.status_code, requests.codes.ok)
1445 res = response.json()
1446 self.assertIn('Renderer service delete in progress',
1447 res['output']['configuration-response-common']['response-message'])
1448 time.sleep(self.WAITING)
1450 def test_91_delete_OCH_OTU4_service(self):
1451 response = test_utils.service_delete_request("service2-OCH-OTU4")
1452 self.assertEqual(response.status_code, requests.codes.ok)
1453 res = response.json()
1454 self.assertIn('Renderer service delete in progress',
1455 res['output']['configuration-response-common']['response-message'])
1456 time.sleep(self.WAITING)
1458 def test_92_disconnect_xponders_from_roadm(self):
1459 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1460 response = test_utils.get_ordm_topo_request("")
1461 self.assertEqual(response.status_code, requests.codes.ok)
1462 res = response.json()
1463 links = res['network'][0]['ietf-network-topology:link']
1465 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1466 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1467 link_name = link["link-id"]
1468 response = test_utils.delete_request(url+link_name)
1469 self.assertEqual(response.status_code, requests.codes.ok)
1471 def test_93_check_openroadm_topology(self):
1472 response = test_utils.get_ordm_topo_request("")
1473 self.assertEqual(response.status_code, requests.codes.ok)
1474 res = response.json()
1475 links = res['network'][0]['ietf-network-topology:link']
1476 self.assertEqual(18, len(links), 'Topology should contain 18 links')
1478 def test_94_disconnect_spdrA(self):
1479 response = test_utils.unmount_device("SPDR-SA1")
1480 self.assertEqual(response.status_code, requests.codes.ok,
1481 test_utils.CODE_SHOULD_BE_200)
1483 def test_95_disconnect_spdrC(self):
1484 response = test_utils.unmount_device("SPDR-SC1")
1485 self.assertEqual(response.status_code, requests.codes.ok,
1486 test_utils.CODE_SHOULD_BE_200)
1488 def test_96_disconnect_roadmA(self):
1489 response = test_utils.unmount_device("ROADM-A1")
1490 self.assertEqual(response.status_code, requests.codes.ok,
1491 test_utils.CODE_SHOULD_BE_200)
1493 def test_97_disconnect_roadmC(self):
1494 response = test_utils.unmount_device("ROADM-C1")
1495 self.assertEqual(response.status_code, requests.codes.ok,
1496 test_utils.CODE_SHOULD_BE_200)
1499 if __name__ == "__main__":
1500 unittest.main(verbosity=2)