3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
20 sys.path.append('transportpce_tests/common/')
24 class TransportPCEtesting(unittest.TestCase):
27 WAITING = 20 # nominal value is 300
28 NODE_VERSION = '2.2.1'
30 cr_serv_sample_data = {"input": {
31 "sdnc-request-header": {
32 "request-id": "request-1",
33 "rpc-action": "service-create",
34 "request-system-id": "appname"
36 "service-name": "service1-OCH-OTU4",
37 "common-id": "commonId",
38 "connection-type": "infrastructure",
40 "service-rate": "100",
41 "node-id": "SPDR-SA1",
42 "service-format": "OTU",
43 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
47 "committed-info-rate": "100000",
48 "committed-burst-size": "64"
53 "port-device-name": "SPDR-SA1-XPDR1",
55 "port-name": "XPDR1-NETWORK1",
56 "port-rack": "000000.00",
57 "port-shelf": "Chassis#1"
60 "lgx-device-name": "Some lgx-device-name",
61 "lgx-port-name": "Some lgx-port-name",
62 "lgx-port-rack": "000000.00",
63 "lgx-port-shelf": "00"
68 "port-device-name": "SPDR-SA1-XPDR1",
70 "port-name": "XPDR1-NETWORK1",
71 "port-rack": "000000.00",
72 "port-shelf": "Chassis#1"
75 "lgx-device-name": "Some lgx-device-name",
76 "lgx-port-name": "Some lgx-port-name",
77 "lgx-port-rack": "000000.00",
78 "lgx-port-shelf": "00"
84 "service-rate": "100",
85 "node-id": "SPDR-SC1",
86 "service-format": "OTU",
87 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
91 "committed-info-rate": "100000",
92 "committed-burst-size": "64"
97 "port-device-name": "SPDR-SC1-XPDR1",
99 "port-name": "XPDR1-NETWORK1",
100 "port-rack": "000000.00",
101 "port-shelf": "Chassis#1"
104 "lgx-device-name": "Some lgx-device-name",
105 "lgx-port-name": "Some lgx-port-name",
106 "lgx-port-rack": "000000.00",
107 "lgx-port-shelf": "00"
112 "port-device-name": "SPDR-SC1-XPDR1",
113 "port-type": "fixed",
114 "port-name": "XPDR1-NETWORK1",
115 "port-rack": "000000.00",
116 "port-shelf": "Chassis#1"
119 "lgx-device-name": "Some lgx-device-name",
120 "lgx-port-name": "Some lgx-port-name",
121 "lgx-port-rack": "000000.00",
122 "lgx-port-shelf": "00"
127 "due-date": "2018-06-15T00:00:01Z",
128 "operator-contact": "pw1234"
134 cls.processes = test_utils.start_tpce()
135 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
136 ('roadma', cls.NODE_VERSION),
137 ('roadmc', cls.NODE_VERSION),
138 ('spdrc', cls.NODE_VERSION)])
141 def tearDownClass(cls):
142 # pylint: disable=not-an-iterable
143 for process in cls.processes:
144 test_utils.shutdown_process(process)
145 print("all processes killed")
150 def test_01_connect_spdrA(self):
151 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
152 self.assertEqual(response.status_code,
153 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_02_connect_spdrC(self):
156 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
157 self.assertEqual(response.status_code,
158 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160 def test_03_connect_rdmA(self):
161 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
162 self.assertEqual(response.status_code,
163 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165 def test_04_connect_rdmC(self):
166 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
167 self.assertEqual(response.status_code,
168 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
171 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
172 "ROADM-A1", "1", "SRG1-PP1-TXRX")
173 self.assertEqual(response.status_code, requests.codes.ok)
174 res = response.json()
175 self.assertIn('Xponder Roadm Link created successfully',
176 res["output"]["result"])
179 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
180 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
181 "ROADM-A1", "1", "SRG1-PP1-TXRX")
182 self.assertEqual(response.status_code, requests.codes.ok)
183 res = response.json()
184 self.assertIn('Roadm Xponder links created successfully',
185 res["output"]["result"])
188 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
189 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
190 "ROADM-C1", "1", "SRG1-PP1-TXRX")
191 self.assertEqual(response.status_code, requests.codes.ok)
192 res = response.json()
193 self.assertIn('Xponder Roadm Link created successfully',
194 res["output"]["result"])
197 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
198 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
199 "ROADM-C1", "1", "SRG1-PP1-TXRX")
200 self.assertEqual(response.status_code, requests.codes.ok)
201 res = response.json()
202 self.assertIn('Roadm Xponder links created successfully',
203 res["output"]["result"])
206 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
207 # Config ROADMA-ROADMC oms-attributes
209 "auto-spanloss": "true",
210 "spanloss-base": 11.4,
211 "spanloss-current": 12,
212 "engineered-spanloss": 12.2,
213 "link-concatenation": [{
216 "SRLG-length": 100000,
218 response = test_utils.add_oms_attr_request(
219 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
220 self.assertEqual(response.status_code, requests.codes.created)
222 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
223 # Config ROADMC-ROADMA oms-attributes
225 "auto-spanloss": "true",
226 "spanloss-base": 11.4,
227 "spanloss-current": 12,
228 "engineered-spanloss": 12.2,
229 "link-concatenation": [{
232 "SRLG-length": 100000,
234 response = test_utils.add_oms_attr_request(
235 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
236 self.assertEqual(response.status_code, requests.codes.created)
238 # test service-create for OCH-OTU4 service from spdr to spdr
239 def test_11_check_otn_topology(self):
240 response = test_utils.get_otn_topo_request()
241 self.assertEqual(response.status_code, requests.codes.ok)
242 res = response.json()
243 nbNode = len(res['network'][0]['node'])
244 self.assertEqual(nbNode, 6, 'There should be 6 nodes')
245 self.assertNotIn('ietf-network-topology:link', res['network'][0])
247 def test_12_create_OCH_OTU4_service(self):
248 response = test_utils.service_create_request(self.cr_serv_sample_data)
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 self.assertIn('PCE calculation in progress',
252 res['output']['configuration-response-common']['response-message'])
253 time.sleep(self.WAITING)
255 def test_13_get_OCH_OTU4_service1(self):
256 response = test_utils.get_service_list_request(
257 "services/service1-OCH-OTU4")
258 self.assertEqual(response.status_code, requests.codes.ok)
259 res = response.json()
261 res['services'][0]['administrative-state'], 'inService')
263 res['services'][0]['service-name'], 'service1-OCH-OTU4')
265 res['services'][0]['connection-type'], 'infrastructure')
267 res['services'][0]['lifecycle-state'], 'planned')
270 # Check correct configuration of devices
271 def test_14_check_interface_och_spdra(self):
272 response = test_utils.check_netconf_node_request(
273 "SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
274 self.assertEqual(response.status_code, requests.codes.ok)
275 res = response.json()
276 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
277 'administrative-state': 'inService',
278 'supporting-circuit-pack-name': 'CP1-CFP0',
279 'type': 'org-openroadm-interfaces:opticalChannel',
280 'supporting-port': 'CP1-CFP0-P1'
281 }, **res['interface'][0]),
284 self.assertDictEqual(
285 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
286 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
287 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
289 def test_15_check_interface_OTU4_spdra(self):
290 response = test_utils.check_netconf_node_request(
291 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
292 self.assertEqual(response.status_code, requests.codes.ok)
293 res = response.json()
294 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
295 'administrative-state': 'inService',
296 'supporting-circuit-pack-name': 'CP1-CFP0',
297 'supporting-interface': 'XPDR1-NETWORK1-761:768',
298 'type': 'org-openroadm-interfaces:otnOtu',
299 'supporting-port': 'CP1-CFP0-P1'
301 input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
302 'expected-dapi': 'Swfw02qXGyI=',
303 'rate': 'org-openroadm-otn-common-types:OTU4',
306 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
309 self.assertDictEqual(input_dict_2,
311 ['org-openroadm-otn-otu-interfaces:otu'])
313 def test_16_check_interface_och_spdrc(self):
314 response = test_utils.check_netconf_node_request(
315 "SPDR-SC1", "interface/XPDR1-NETWORK1-761:768")
316 self.assertEqual(response.status_code, requests.codes.ok)
317 res = response.json()
318 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
319 'administrative-state': 'inService',
320 'supporting-circuit-pack-name': 'CP1-CFP0',
321 'type': 'org-openroadm-interfaces:opticalChannel',
322 'supporting-port': 'CP1-CFP0-P1'
323 }, **res['interface'][0]),
326 self.assertDictEqual(
327 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
328 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
329 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
331 def test_17_check_interface_OTU4_spdrc(self):
332 response = test_utils.check_netconf_node_request(
333 "SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
334 self.assertEqual(response.status_code, requests.codes.ok)
335 res = response.json()
336 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
337 'administrative-state': 'inService',
338 'supporting-circuit-pack-name': 'CP1-CFP0',
339 'supporting-interface': 'XPDR1-NETWORK1-761:768',
340 'type': 'org-openroadm-interfaces:otnOtu',
341 'supporting-port': 'CP1-CFP0-P1'
343 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
344 'expected-sapi': 'Swfw02qXGyI=',
345 'tx-sapi': 'fuYZwEO660g=',
346 'expected-dapi': 'fuYZwEO660g=',
347 'rate': 'org-openroadm-otn-common-types:OTU4',
351 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
354 self.assertDictEqual(input_dict_2,
356 ['org-openroadm-otn-otu-interfaces:otu'])
358 def test_18_check_no_interface_ODU4_spdra(self):
359 response = test_utils.check_netconf_node_request(
360 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
361 self.assertEqual(response.status_code, requests.codes.conflict)
362 res = response.json()
364 {"error-type": "application", "error-tag": "data-missing",
365 "error-message": "Request could not be completed because the relevant data model content does not exist"},
366 res['errors']['error'])
368 def test_19_check_openroadm_topo_spdra(self):
369 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
370 self.assertEqual(response.status_code, requests.codes.ok)
371 res = response.json()
372 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
373 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
374 self.assertEqual({u'frequency': 196.1,
376 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
379 def test_20_check_openroadm_topo_ROADMA_SRG(self):
380 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
381 self.assertEqual(response.status_code, requests.codes.ok)
382 res = response.json()
383 freq_map = base64.b64decode(
384 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
385 freq_map_array = [int(x) for x in freq_map]
386 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
387 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
389 if ele['tp-id'] == 'SRG1-PP1-TXRX':
390 freq_map = base64.b64decode(
391 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
392 freq_map_array = [int(x) for x in freq_map]
393 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
394 if ele['tp-id'] == 'SRG1-PP2-TXRX':
395 self.assertNotIn('avail-freq-maps', dict.keys(ele))
398 def test_21_check_openroadm_topo_ROADMA_DEG(self):
399 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
400 self.assertEqual(response.status_code, requests.codes.ok)
401 res = response.json()
402 freq_map = base64.b64decode(
403 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
404 freq_map_array = [int(x) for x in freq_map]
405 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
406 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
408 if ele['tp-id'] == 'DEG2-CTP-TXRX':
409 freq_map = base64.b64decode(
410 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
411 freq_map_array = [int(x) for x in freq_map]
412 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
413 if ele['tp-id'] == 'DEG2-TTP-TXRX':
414 freq_map = base64.b64decode(
415 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
416 freq_map_array = [int(x) for x in freq_map]
417 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
420 def test_22_check_otn_topo_otu4_links(self):
421 response = test_utils.get_otn_topo_request()
422 self.assertEqual(response.status_code, requests.codes.ok)
423 res = response.json()
424 nb_links = len(res['network'][0]['ietf-network-topology:link'])
425 self.assertEqual(nb_links, 2)
426 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
427 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
428 for link in res['network'][0]['ietf-network-topology:link']:
429 self.assertIn(link['link-id'], listLinkId)
431 link['transportpce-topology:otn-link-type'], 'OTU4')
433 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
435 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
437 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
439 link['org-openroadm-common-network:opposite-link'], listLinkId)
441 # test service-create for ODU4 service from spdr to spdr
442 def test_23_create_ODU4_service(self):
443 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
444 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
445 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
446 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
447 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
448 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
449 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
451 response = test_utils.service_create_request(self.cr_serv_sample_data)
452 self.assertEqual(response.status_code, requests.codes.ok)
453 res = response.json()
454 self.assertIn('PCE calculation in progress',
455 res['output']['configuration-response-common']['response-message'])
456 time.sleep(self.WAITING)
458 def test_24_get_ODU4_service1(self):
459 response = test_utils.get_service_list_request(
460 "services/service1-ODU4")
461 self.assertEqual(response.status_code, requests.codes.ok)
462 res = response.json()
464 res['services'][0]['administrative-state'], 'inService')
466 res['services'][0]['service-name'], 'service1-ODU4')
468 res['services'][0]['connection-type'], 'infrastructure')
470 res['services'][0]['lifecycle-state'], 'planned')
473 def test_25_check_interface_ODU4_spdra(self):
474 response = test_utils.check_netconf_node_request(
475 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
476 self.assertEqual(response.status_code, requests.codes.ok)
477 res = response.json()
478 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
479 'administrative-state': 'inService',
480 'supporting-circuit-pack-name': 'CP1-CFP0',
481 'supporting-interface': 'XPDR1-NETWORK1-OTU',
482 'type': 'org-openroadm-interfaces:otnOdu',
483 'supporting-port': 'CP1-CFP0-P1'}
484 # SAPI/DAPI are added in the Otu4 renderer
485 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
486 'rate': 'org-openroadm-otn-common-types:ODU4',
487 'expected-dapi': 'Swfw02qXGyI=',
488 'expected-sapi': 'fuYZwEO660g=',
489 'tx-dapi': 'fuYZwEO660g=',
490 'tx-sapi': 'Swfw02qXGyI='}
492 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
494 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
496 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
498 self.assertDictEqual(
499 {u'payload-type': u'21', u'exp-payload-type': u'21'},
500 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
502 def test_26_check_interface_ODU4_spdrc(self):
503 response = test_utils.check_netconf_node_request(
504 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
505 self.assertEqual(response.status_code, requests.codes.ok)
506 res = response.json()
507 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
508 'administrative-state': 'inService',
509 'supporting-circuit-pack-name': 'CP1-CFP0',
510 'supporting-interface': 'XPDR1-NETWORK1-OTU',
511 'type': 'org-openroadm-interfaces:otnOdu',
512 'supporting-port': 'CP1-CFP0-P1'}
513 # SAPI/DAPI are added in the Otu4 renderer
514 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
515 'rate': 'org-openroadm-otn-common-types:ODU4',
516 'tx-sapi': 'fuYZwEO660g=',
517 'tx-dapi': 'Swfw02qXGyI=',
518 'expected-sapi': 'Swfw02qXGyI=',
519 'expected-dapi': 'fuYZwEO660g='
521 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
523 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
525 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
527 self.assertDictEqual(
528 {u'payload-type': u'21', u'exp-payload-type': u'21'},
529 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
531 def test_27_check_otn_topo_links(self):
532 response = test_utils.get_otn_topo_request()
533 self.assertEqual(response.status_code, requests.codes.ok)
534 res = response.json()
535 nb_links = len(res['network'][0]['ietf-network-topology:link'])
536 self.assertEqual(nb_links, 4)
537 for link in res['network'][0]['ietf-network-topology:link']:
538 linkId = link['link-id']
539 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
540 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
542 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
544 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
545 elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
546 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
548 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
550 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
552 link['transportpce-topology:otn-link-type'], 'ODTU4')
554 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
555 self.assertIn(link['org-openroadm-common-network:opposite-link'],
556 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
557 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
559 self.fail("this link should not exist")
561 def test_28_check_otn_topo_tp(self):
562 response = test_utils.get_otn_topo_request()
563 res = response.json()
564 for node in res['network'][0]['node']:
565 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
566 tpList = node['ietf-network-topology:termination-point']
568 if tp['tp-id'] == 'XPDR1-NETWORK1':
569 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
570 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
572 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
573 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
574 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
576 # test service-create for 10GE service from spdr to spdr
577 def test_29_create_10GE_service(self):
578 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
579 self.cr_serv_sample_data["input"]["connection-type"] = "service"
580 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
581 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
582 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
583 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
584 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
585 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
586 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
587 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
588 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
589 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
590 response = test_utils.service_create_request(self.cr_serv_sample_data)
591 self.assertEqual(response.status_code, requests.codes.ok)
592 res = response.json()
593 self.assertIn('PCE calculation in progress',
594 res['output']['configuration-response-common']['response-message'])
595 time.sleep(self.WAITING)
597 def test_30_get_10GE_service1(self):
598 response = test_utils.get_service_list_request(
599 "services/service1-10GE")
600 self.assertEqual(response.status_code, requests.codes.ok)
601 res = response.json()
603 res['services'][0]['administrative-state'], 'inService')
605 res['services'][0]['service-name'], 'service1-10GE')
607 res['services'][0]['connection-type'], 'service')
609 res['services'][0]['lifecycle-state'], 'planned')
612 def test_31_check_interface_10GE_CLIENT_spdra(self):
613 response = test_utils.check_netconf_node_request(
614 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
615 self.assertEqual(response.status_code, requests.codes.ok)
616 res = response.json()
617 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
618 'administrative-state': 'inService',
619 'supporting-circuit-pack-name': 'CP1-SFP4',
620 'type': 'org-openroadm-interfaces:ethernetCsmacd',
621 'supporting-port': 'CP1-SFP4-P1'
623 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
625 self.assertDictEqual(
627 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
629 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
630 response = test_utils.check_netconf_node_request(
631 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
632 self.assertEqual(response.status_code, requests.codes.ok)
633 res = response.json()
634 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
635 'administrative-state': 'inService',
636 'supporting-circuit-pack-name': 'CP1-SFP4',
637 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
638 'type': 'org-openroadm-interfaces:otnOdu',
639 'supporting-port': 'CP1-SFP4-P1'}
641 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
642 'rate': 'org-openroadm-otn-common-types:ODU2e',
643 'monitoring-mode': 'terminated'}
645 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
647 self.assertDictEqual(dict(input_dict_2,
648 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
649 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
650 self.assertDictEqual(
651 {u'payload-type': u'03', u'exp-payload-type': u'03'},
652 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
654 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
655 response = test_utils.check_netconf_node_request(
656 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
657 self.assertEqual(response.status_code, requests.codes.ok)
658 res = response.json()
659 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
660 'administrative-state': 'inService',
661 'supporting-circuit-pack-name': 'CP1-CFP0',
662 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
663 'type': 'org-openroadm-interfaces:otnOdu',
664 'supporting-port': 'CP1-CFP0-P1'}
666 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
667 'rate': 'org-openroadm-otn-common-types:ODU2e',
668 'monitoring-mode': 'monitored'}
669 input_dict_3 = {'trib-port-number': 1}
671 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
673 self.assertDictEqual(dict(input_dict_2,
674 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
675 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
676 self.assertDictEqual(dict(input_dict_3,
677 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
678 'parent-odu-allocation']),
679 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
680 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
683 def test_34_check_ODU2E_connection_spdra(self):
684 response = test_utils.check_netconf_node_request(
686 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
687 self.assertEqual(response.status_code, requests.codes.ok)
688 res = response.json()
691 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
692 'direction': 'bidirectional'
695 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
696 res['odu-connection'][0])
697 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
698 res['odu-connection'][0]['destination'])
699 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
700 res['odu-connection'][0]['source'])
702 def test_35_check_interface_10GE_CLIENT_spdrc(self):
703 response = test_utils.check_netconf_node_request(
704 "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
705 self.assertEqual(response.status_code, requests.codes.ok)
706 res = response.json()
707 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
708 'administrative-state': 'inService',
709 'supporting-circuit-pack-name': 'CP1-SFP4',
710 'type': 'org-openroadm-interfaces:ethernetCsmacd',
711 'supporting-port': 'CP1-SFP4-P1'
713 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
715 self.assertDictEqual(
717 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
719 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
720 response = test_utils.check_netconf_node_request(
721 "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
722 self.assertEqual(response.status_code, requests.codes.ok)
723 res = response.json()
724 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
725 'administrative-state': 'inService',
726 'supporting-circuit-pack-name': 'CP1-SFP4',
727 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
728 'type': 'org-openroadm-interfaces:otnOdu',
729 'supporting-port': 'CP1-SFP4-P1'}
731 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
732 'rate': 'org-openroadm-otn-common-types:ODU2e',
733 'monitoring-mode': 'terminated'}
735 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
737 self.assertDictEqual(dict(input_dict_2,
738 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
739 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
740 self.assertDictEqual(
741 {u'payload-type': u'03', u'exp-payload-type': u'03'},
742 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
744 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
745 response = test_utils.check_netconf_node_request(
746 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
747 self.assertEqual(response.status_code, requests.codes.ok)
748 res = response.json()
749 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
750 'administrative-state': 'inService',
751 'supporting-circuit-pack-name': 'CP1-CFP0',
752 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
753 'type': 'org-openroadm-interfaces:otnOdu',
754 'supporting-port': 'CP1-CFP0-P1'}
756 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
757 'rate': 'org-openroadm-otn-common-types:ODU2e',
758 'monitoring-mode': 'monitored'}
760 input_dict_3 = {'trib-port-number': 1}
762 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
764 self.assertDictEqual(dict(input_dict_2,
765 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
766 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
767 self.assertDictEqual(dict(input_dict_3,
768 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
769 'parent-odu-allocation']),
770 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
771 'parent-odu-allocation'])
774 'org-openroadm-otn-odu-interfaces:odu'][
775 'parent-odu-allocation']['trib-slots'])
777 def test_38_check_ODU2E_connection_spdrc(self):
778 response = test_utils.check_netconf_node_request(
780 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
781 self.assertEqual(response.status_code, requests.codes.ok)
782 res = response.json()
785 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
786 'direction': 'bidirectional'
789 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
790 res['odu-connection'][0])
791 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
792 res['odu-connection'][0]['destination'])
793 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
794 res['odu-connection'][0]['source'])
796 def test_39_check_otn_topo_links(self):
797 response = test_utils.get_otn_topo_request()
798 self.assertEqual(response.status_code, requests.codes.ok)
799 res = response.json()
800 nb_links = len(res['network'][0]['ietf-network-topology:link'])
801 self.assertEqual(nb_links, 4)
802 for link in res['network'][0]['ietf-network-topology:link']:
803 linkId = link['link-id']
804 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
805 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
807 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
809 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
811 def test_40_check_otn_topo_tp(self):
812 response = test_utils.get_otn_topo_request()
813 res = response.json()
814 for node in res['network'][0]['node']:
815 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
816 tpList = node['ietf-network-topology:termination-point']
818 if tp['tp-id'] == 'XPDR1-NETWORK1':
819 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
820 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
821 tsPoolList = list(range(1, 9))
823 tsPoolList, xpdrTpPortConAt['ts-pool'])
825 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
827 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
829 def test_41_delete_10GE_service(self):
830 response = test_utils.service_delete_request("service1-10GE")
831 self.assertEqual(response.status_code, requests.codes.ok)
832 res = response.json()
833 self.assertIn('Renderer service delete in progress',
834 res['output']['configuration-response-common']['response-message'])
835 time.sleep(self.WAITING)
837 def test_42_check_service_list(self):
838 response = test_utils.get_service_list_request("")
839 self.assertEqual(response.status_code, requests.codes.ok)
840 res = response.json()
841 self.assertEqual(len(res['service-list']['services']), 2)
844 def test_43_check_no_ODU2e_connection_spdra(self):
845 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
846 self.assertEqual(response.status_code, requests.codes.ok)
847 res = response.json()
848 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
851 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
852 response = test_utils.check_netconf_node_request(
853 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
854 self.assertEqual(response.status_code, requests.codes.conflict)
856 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
857 response = test_utils.check_netconf_node_request(
858 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
859 self.assertEqual(response.status_code, requests.codes.conflict)
861 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
862 response = test_utils.check_netconf_node_request(
863 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
864 self.assertEqual(response.status_code, requests.codes.conflict)
866 def test_47_check_otn_topo_links(self):
867 response = test_utils.get_otn_topo_request()
868 self.assertEqual(response.status_code, requests.codes.ok)
869 res = response.json()
870 nb_links = len(res['network'][0]['ietf-network-topology:link'])
871 self.assertEqual(nb_links, 4)
872 for link in res['network'][0]['ietf-network-topology:link']:
873 linkId = link['link-id']
874 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
875 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
877 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
879 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
881 def test_48_check_otn_topo_tp(self):
882 response = test_utils.get_otn_topo_request()
883 res = response.json()
884 for node in res['network'][0]['node']:
885 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
886 tpList = node['ietf-network-topology:termination-point']
888 if tp['tp-id'] == 'XPDR1-NETWORK1':
889 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
890 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
892 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
894 def test_49_delete_ODU4_service(self):
895 response = test_utils.service_delete_request("service1-ODU4")
896 self.assertEqual(response.status_code, requests.codes.ok)
897 res = response.json()
898 self.assertIn('Renderer service delete in progress',
899 res['output']['configuration-response-common']['response-message'])
900 time.sleep(self.WAITING)
902 def test_50_check_service_list(self):
903 response = test_utils.get_service_list_request("")
904 self.assertEqual(response.status_code, requests.codes.ok)
905 res = response.json()
906 self.assertEqual(len(res['service-list']['services']), 1)
909 def test_51_check_no_interface_ODU4_spdra(self):
910 response = test_utils.check_netconf_node_request(
911 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
912 self.assertEqual(response.status_code, requests.codes.conflict)
914 def test_52_check_otn_topo_links(self):
915 self.test_22_check_otn_topo_otu4_links()
917 def test_53_check_otn_topo_tp(self):
918 response = test_utils.get_otn_topo_request()
919 res = response.json()
920 for node in res['network'][0]['node']:
921 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
922 tpList = node['ietf-network-topology:termination-point']
924 if tp['tp-id'] == 'XPDR1-NETWORK1':
925 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
926 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
928 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
930 def test_54_delete_OCH_OTU4_service(self):
931 response = test_utils.service_delete_request("service1-OCH-OTU4")
932 self.assertEqual(response.status_code, requests.codes.ok)
933 res = response.json()
934 self.assertIn('Renderer service delete in progress',
935 res['output']['configuration-response-common']['response-message'])
936 time.sleep(self.WAITING)
938 def test_55_get_no_service(self):
939 response = test_utils.get_service_list_request("")
940 self.assertEqual(response.status_code, requests.codes.conflict)
941 res = response.json()
943 {"error-type": "application", "error-tag": "data-missing",
944 "error-message": "Request could not be completed because the relevant data model content does not exist"},
945 res['errors']['error'])
948 def test_56_check_no_interface_OTU4_spdra(self):
949 response = test_utils.check_netconf_node_request(
950 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
951 self.assertEqual(response.status_code, requests.codes.conflict)
953 def test_57_check_no_interface_OCH_spdra(self):
954 response = test_utils.check_netconf_node_request(
955 "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
956 self.assertEqual(response.status_code, requests.codes.conflict)
958 def test_58_getLinks_OtnTopology(self):
959 response = test_utils.get_otn_topo_request()
960 self.assertEqual(response.status_code, requests.codes.ok)
961 res = response.json()
962 self.assertNotIn('ietf-network-topology:link', res['network'][0])
964 def test_59_check_openroadm_topo_spdra(self):
965 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
966 self.assertEqual(response.status_code, requests.codes.ok)
967 res = response.json()
968 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
969 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
970 self.assertNotIn('wavelength', dict.keys(
971 tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
974 def test_60_check_openroadm_topo_ROADMA_SRG(self):
975 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
976 self.assertEqual(response.status_code, requests.codes.ok)
977 res = response.json()
978 freq_map = base64.b64decode(
979 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
980 freq_map_array = [int(x) for x in freq_map]
981 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
982 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
984 if ele['tp-id'] == 'SRG1-PP1-TXRX':
985 freq_map = base64.b64decode(
986 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
987 freq_map_array = [int(x) for x in freq_map]
988 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
991 def test_61_check_openroadm_topo_ROADMA_DEG(self):
992 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
993 self.assertEqual(response.status_code, requests.codes.ok)
994 res = response.json()
995 freq_map = base64.b64decode(
996 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
997 freq_map_array = [int(x) for x in freq_map]
998 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
999 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1000 for ele in liste_tp:
1001 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1002 freq_map = base64.b64decode(
1003 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1004 freq_map_array = [int(x) for x in freq_map]
1005 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1006 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1007 freq_map = base64.b64decode(
1008 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1009 freq_map_array = [int(x) for x in freq_map]
1010 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1013 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
1014 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "3", "1",
1015 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1016 self.assertEqual(response.status_code, requests.codes.ok)
1017 res = response.json()
1018 self.assertIn('Xponder Roadm Link created successfully',
1019 res["output"]["result"])
1022 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
1023 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "3", "1",
1024 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1025 self.assertEqual(response.status_code, requests.codes.ok)
1026 res = response.json()
1027 self.assertIn('Roadm Xponder links created successfully',
1028 res["output"]["result"])
1031 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1032 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "3", "1",
1033 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1034 self.assertEqual(response.status_code, requests.codes.ok)
1035 res = response.json()
1036 self.assertIn('Xponder Roadm Link created successfully',
1037 res["output"]["result"])
1040 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1041 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "3", "1",
1042 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1043 self.assertEqual(response.status_code, requests.codes.ok)
1044 res = response.json()
1045 self.assertIn('Roadm Xponder links created successfully',
1046 res["output"]["result"])
1049 def test_66_create_OCH_OTU4_service_2(self):
1050 self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1051 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1052 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1053 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1054 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1055 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1056 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1057 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1058 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1059 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1060 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1061 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1062 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1063 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1064 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1065 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1066 response = test_utils.service_create_request(self.cr_serv_sample_data)
1067 self.assertEqual(response.status_code, requests.codes.ok)
1068 res = response.json()
1069 self.assertIn('PCE calculation in progress',
1070 res['output']['configuration-response-common']['response-message'])
1071 time.sleep(self.WAITING)
1073 def test_67_get_OCH_OTU4_service2(self):
1074 response = test_utils.get_service_list_request(
1075 "services/service2-OCH-OTU4")
1076 self.assertEqual(response.status_code, requests.codes.ok)
1077 res = response.json()
1079 res['services'][0]['administrative-state'], 'inService')
1081 res['services'][0]['service-name'], 'service2-OCH-OTU4')
1083 res['services'][0]['connection-type'], 'infrastructure')
1085 res['services'][0]['lifecycle-state'], 'planned')
1088 def test_68_create_ODU4_service_2(self):
1089 self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1090 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1091 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1092 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1093 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1094 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1095 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1096 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1097 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1098 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1099 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1100 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1101 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1102 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1103 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1105 response = test_utils.service_create_request(self.cr_serv_sample_data)
1106 self.assertEqual(response.status_code, requests.codes.ok)
1107 res = response.json()
1108 self.assertIn('PCE calculation in progress',
1109 res['output']['configuration-response-common']['response-message'])
1110 time.sleep(self.WAITING)
1112 def test_69_get_ODU4_service2(self):
1113 response = test_utils.get_service_list_request(
1114 "services/service2-ODU4")
1115 self.assertEqual(response.status_code, requests.codes.ok)
1116 res = response.json()
1118 res['services'][0]['administrative-state'], 'inService')
1120 res['services'][0]['service-name'], 'service2-ODU4')
1122 res['services'][0]['connection-type'], 'infrastructure')
1124 res['services'][0]['lifecycle-state'], 'planned')
1127 def test_70_create_1GE_service(self):
1128 self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1129 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1130 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1131 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1132 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1133 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1134 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1135 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1136 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1137 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1138 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1139 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1140 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1141 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1142 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1143 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1144 response = test_utils.service_create_request(self.cr_serv_sample_data)
1145 self.assertEqual(response.status_code, requests.codes.ok)
1146 res = response.json()
1147 self.assertIn('PCE calculation in progress',
1148 res['output']['configuration-response-common']['response-message'])
1149 time.sleep(self.WAITING)
1151 def test_71_get_1GE_service1(self):
1152 response = test_utils.get_service_list_request("services/service1-1GE")
1153 self.assertEqual(response.status_code, requests.codes.ok)
1154 res = response.json()
1156 res['services'][0]['administrative-state'], 'inService')
1158 res['services'][0]['service-name'], 'service1-1GE')
1160 res['services'][0]['connection-type'], 'service')
1162 res['services'][0]['lifecycle-state'], 'planned')
1165 def test_72_check_interface_1GE_CLIENT_spdra(self):
1166 response = test_utils.check_netconf_node_request(
1167 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1168 self.assertEqual(response.status_code, requests.codes.ok)
1169 res = response.json()
1170 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1171 'administrative-state': 'inService',
1172 'supporting-circuit-pack-name': 'CP1-SFP4',
1173 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1174 'supporting-port': 'CP1-SFP4-P1'
1176 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1177 res['interface'][0])
1178 self.assertDictEqual(
1180 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1182 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1183 response = test_utils.check_netconf_node_request(
1184 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1185 self.assertEqual(response.status_code, requests.codes.ok)
1186 res = response.json()
1187 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1188 'administrative-state': 'inService',
1189 'supporting-circuit-pack-name': 'CP3-SFP1',
1190 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1191 'type': 'org-openroadm-interfaces:otnOdu',
1192 'supporting-port': 'CP3-SFP1-P1'}
1194 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1195 'rate': 'org-openroadm-otn-common-types:ODU0',
1196 'monitoring-mode': 'terminated'}
1198 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1199 res['interface'][0])
1200 self.assertDictEqual(dict(input_dict_2,
1201 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1202 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1203 self.assertDictEqual(
1204 {u'payload-type': u'07', u'exp-payload-type': u'07'},
1205 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1207 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1208 response = test_utils.check_netconf_node_request(
1209 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1210 self.assertEqual(response.status_code, requests.codes.ok)
1211 res = response.json()
1212 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1213 'administrative-state': 'inService',
1214 'supporting-circuit-pack-name': 'CP3-CFP0',
1215 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1216 'type': 'org-openroadm-interfaces:otnOdu',
1217 'supporting-port': 'CP3-CFP0-P1'}
1219 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1220 'rate': 'org-openroadm-otn-common-types:ODU0',
1221 'monitoring-mode': 'monitored'}
1222 input_dict_3 = {'trib-port-number': 1}
1224 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1225 res['interface'][0])
1226 self.assertDictEqual(dict(input_dict_2,
1227 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1228 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1229 self.assertDictEqual(dict(input_dict_3,
1230 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1231 'parent-odu-allocation']),
1232 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1233 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1236 def test_75_check_ODU0_connection_spdra(self):
1237 response = test_utils.check_netconf_node_request(
1239 "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1240 self.assertEqual(response.status_code, requests.codes.ok)
1241 res = response.json()
1244 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1245 'direction': 'bidirectional'
1248 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1249 res['odu-connection'][0])
1250 self.assertDictEqual({u'dst-if': u'XPDR3-NETWORK1-ODU0-service1-1GE'},
1251 res['odu-connection'][0]['destination'])
1252 self.assertDictEqual({u'src-if': u'XPDR3-CLIENT1-ODU0-service1-1GE'},
1253 res['odu-connection'][0]['source'])
1255 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1256 response = test_utils.check_netconf_node_request(
1257 "SPDR-SC1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1258 self.assertEqual(response.status_code, requests.codes.ok)
1259 res = response.json()
1260 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1261 'administrative-state': 'inService',
1262 'supporting-circuit-pack-name': 'CP3-SFP1',
1263 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1264 'supporting-port': 'CP3-SFP1-P1'
1266 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1267 res['interface'][0])
1268 self.assertDictEqual(
1270 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1272 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1273 response = test_utils.check_netconf_node_request(
1274 "SPDR-SC1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1275 self.assertEqual(response.status_code, requests.codes.ok)
1276 res = response.json()
1277 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1278 'administrative-state': 'inService',
1279 'supporting-circuit-pack-name': 'CP3-SFP1',
1280 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1281 'type': 'org-openroadm-interfaces:otnOdu',
1282 'supporting-port': 'CP3-SFP1-P1'}
1284 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1285 'rate': 'org-openroadm-otn-common-types:ODU0',
1286 'monitoring-mode': 'terminated'}
1288 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1289 res['interface'][0])
1290 self.assertDictEqual(dict(input_dict_2,
1291 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1292 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1293 self.assertDictEqual(
1294 {u'payload-type': u'07', u'exp-payload-type': u'07'},
1295 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1297 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1298 response = test_utils.check_netconf_node_request(
1299 "SPDR-SC1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1300 self.assertEqual(response.status_code, requests.codes.ok)
1301 res = response.json()
1302 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1303 'administrative-state': 'inService',
1304 'supporting-circuit-pack-name': 'CP3-CFP0',
1305 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1306 'type': 'org-openroadm-interfaces:otnOdu',
1307 'supporting-port': 'CP3-CFP0-P1'}
1309 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1310 'rate': 'org-openroadm-otn-common-types:ODU0',
1311 'monitoring-mode': 'monitored'}
1313 input_dict_3 = {'trib-port-number': 1}
1315 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1316 res['interface'][0])
1317 self.assertDictEqual(dict(input_dict_2,
1318 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1319 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1320 self.assertDictEqual(dict(input_dict_3,
1321 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1322 'parent-odu-allocation']),
1323 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1324 'parent-odu-allocation'])
1326 res['interface'][0][
1327 'org-openroadm-otn-odu-interfaces:odu'][
1328 'parent-odu-allocation']['trib-slots'])
1330 def test_79_check_ODU0_connection_spdrc(self):
1331 response = test_utils.check_netconf_node_request(
1333 "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1334 self.assertEqual(response.status_code, requests.codes.ok)
1335 res = response.json()
1338 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1339 'direction': 'bidirectional'
1342 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1343 res['odu-connection'][0])
1344 self.assertDictEqual({u'dst-if': u'XPDR3-NETWORK1-ODU0-service1-1GE'},
1345 res['odu-connection'][0]['destination'])
1346 self.assertDictEqual({u'src-if': u'XPDR3-CLIENT1-ODU0-service1-1GE'},
1347 res['odu-connection'][0]['source'])
1349 def test_80_check_otn_topo_links(self):
1350 response = test_utils.get_otn_topo_request()
1351 self.assertEqual(response.status_code, requests.codes.ok)
1352 res = response.json()
1353 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1354 self.assertEqual(nb_links, 4)
1355 for link in res['network'][0]['ietf-network-topology:link']:
1356 linkId = link['link-id']
1357 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1358 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1360 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1362 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1364 def test_81_check_otn_topo_tp(self):
1365 response = test_utils.get_otn_topo_request()
1366 res = response.json()
1367 for node in res['network'][0]['node']:
1368 if node['node-id'] == 'SPDR-SA1-XPDR3' or 'SPDR-SC1-XPDR3':
1369 tpList = node['ietf-network-topology:termination-point']
1371 if tp['tp-id'] == 'XPDR3-NETWORK1':
1372 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1373 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1374 tsPoolList = list(range(1, 2))
1376 tsPoolList, xpdrTpPortConAt['ts-pool'])
1378 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1380 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1382 def test_82_delete_1GE_service(self):
1383 response = test_utils.service_delete_request("service1-1GE")
1384 self.assertEqual(response.status_code, requests.codes.ok)
1385 res = response.json()
1386 self.assertIn('Renderer service delete in progress',
1387 res['output']['configuration-response-common']['response-message'])
1388 time.sleep(self.WAITING)
1390 def test_83_check_service_list(self):
1391 response = test_utils.get_service_list_request("")
1392 self.assertEqual(response.status_code, requests.codes.ok)
1393 res = response.json()
1394 self.assertEqual(len(res['service-list']['services']), 2)
1397 def test_84_check_no_ODU0_connection_spdra(self):
1398 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1399 self.assertEqual(response.status_code, requests.codes.ok)
1400 res = response.json()
1401 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1404 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1405 response = test_utils.check_netconf_node_request(
1406 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1")
1407 self.assertEqual(response.status_code, requests.codes.conflict)
1409 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1410 response = test_utils.check_netconf_node_request(
1411 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1")
1412 self.assertEqual(response.status_code, requests.codes.conflict)
1414 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1415 response = test_utils.check_netconf_node_request(
1416 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1417 self.assertEqual(response.status_code, requests.codes.conflict)
1419 def test_88_check_otn_topo_links(self):
1420 response = test_utils.get_otn_topo_request()
1421 self.assertEqual(response.status_code, requests.codes.ok)
1422 res = response.json()
1423 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1424 self.assertEqual(nb_links, 4)
1425 for link in res['network'][0]['ietf-network-topology:link']:
1426 linkId = link['link-id']
1427 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1428 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1430 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1432 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1434 def test_89_check_otn_topo_tp(self):
1435 response = test_utils.get_otn_topo_request()
1436 res = response.json()
1437 for node in res['network'][0]['node']:
1438 if (node['node-id'] == 'SPDR-SA1-XPDR3' or 'SPDR-SC1-XPDR3'):
1439 tpList = node['ietf-network-topology:termination-point']
1441 if tp['tp-id'] == 'XPDR3-NETWORK1':
1442 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1443 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1445 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1447 def test_90_delete_ODU4_service(self):
1448 response = test_utils.service_delete_request("service2-ODU4")
1449 self.assertEqual(response.status_code, requests.codes.ok)
1450 res = response.json()
1451 self.assertIn('Renderer service delete in progress',
1452 res['output']['configuration-response-common']['response-message'])
1453 time.sleep(self.WAITING)
1455 def test_91_delete_OCH_OTU4_service(self):
1456 response = test_utils.service_delete_request("service2-OCH-OTU4")
1457 self.assertEqual(response.status_code, requests.codes.ok)
1458 res = response.json()
1459 self.assertIn('Renderer service delete in progress',
1460 res['output']['configuration-response-common']['response-message'])
1461 time.sleep(self.WAITING)
1463 def test_92_disconnect_xponders_from_roadm(self):
1464 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1465 response = test_utils.get_ordm_topo_request("")
1466 self.assertEqual(response.status_code, requests.codes.ok)
1467 res = response.json()
1468 links = res['network'][0]['ietf-network-topology:link']
1470 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1471 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1472 link_name = link["link-id"]
1473 response = test_utils.delete_request(url+link_name)
1474 self.assertEqual(response.status_code, requests.codes.ok)
1476 def test_93_check_openroadm_topology(self):
1477 response = test_utils.get_ordm_topo_request("")
1478 self.assertEqual(response.status_code, requests.codes.ok)
1479 res = response.json()
1480 links = res['network'][0]['ietf-network-topology:link']
1481 self.assertEqual(18, len(links), 'Topology should contain 18 links')
1483 def test_94_disconnect_spdrA(self):
1484 response = test_utils.unmount_device("SPDR-SA1")
1485 self.assertEqual(response.status_code, requests.codes.ok,
1486 test_utils.CODE_SHOULD_BE_200)
1488 def test_95_disconnect_spdrC(self):
1489 response = test_utils.unmount_device("SPDR-SC1")
1490 self.assertEqual(response.status_code, requests.codes.ok,
1491 test_utils.CODE_SHOULD_BE_200)
1493 def test_96_disconnect_roadmA(self):
1494 response = test_utils.unmount_device("ROADM-A1")
1495 self.assertEqual(response.status_code, requests.codes.ok,
1496 test_utils.CODE_SHOULD_BE_200)
1498 def test_97_disconnect_roadmC(self):
1499 response = test_utils.unmount_device("ROADM-C1")
1500 self.assertEqual(response.status_code, requests.codes.ok,
1501 test_utils.CODE_SHOULD_BE_200)
1504 if __name__ == "__main__":
1505 unittest.main(verbosity=2)