3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
28 class TransportPCEtesting(unittest.TestCase):
31 WAITING = 20 # nominal value is 300
32 NODE_VERSION = '2.2.1'
34 cr_serv_sample_data = {"input": {
35 "sdnc-request-header": {
36 "request-id": "request-1",
37 "rpc-action": "service-create",
38 "request-system-id": "appname"
40 "service-name": "service1-OCH-OTU4",
41 "common-id": "commonId",
42 "connection-type": "infrastructure",
44 "service-rate": "100",
45 "node-id": "SPDR-SA1",
46 "service-format": "OTU",
47 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
51 "port-device-name": "SPDR-SA1-XPDR1",
53 "port-name": "XPDR1-NETWORK1",
54 "port-rack": "000000.00",
55 "port-shelf": "Chassis#1"
58 "lgx-device-name": "Some lgx-device-name",
59 "lgx-port-name": "Some lgx-port-name",
60 "lgx-port-rack": "000000.00",
61 "lgx-port-shelf": "00"
67 "port-device-name": "SPDR-SA1-XPDR1",
69 "port-name": "XPDR1-NETWORK1",
70 "port-rack": "000000.00",
71 "port-shelf": "Chassis#1"
74 "lgx-device-name": "Some lgx-device-name",
75 "lgx-port-name": "Some lgx-port-name",
76 "lgx-port-rack": "000000.00",
77 "lgx-port-shelf": "00"
84 "service-rate": "100",
85 "node-id": "SPDR-SC1",
86 "service-format": "OTU",
87 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
91 "port-device-name": "SPDR-SC1-XPDR1",
93 "port-name": "XPDR1-NETWORK1",
94 "port-rack": "000000.00",
95 "port-shelf": "Chassis#1"
98 "lgx-device-name": "Some lgx-device-name",
99 "lgx-port-name": "Some lgx-port-name",
100 "lgx-port-rack": "000000.00",
101 "lgx-port-shelf": "00"
107 "port-device-name": "SPDR-SC1-XPDR1",
108 "port-type": "fixed",
109 "port-name": "XPDR1-NETWORK1",
110 "port-rack": "000000.00",
111 "port-shelf": "Chassis#1"
114 "lgx-device-name": "Some lgx-device-name",
115 "lgx-port-name": "Some lgx-port-name",
116 "lgx-port-rack": "000000.00",
117 "lgx-port-shelf": "00"
123 "due-date": "2018-06-15T00:00:01Z",
124 "operator-contact": "pw1234"
130 cls.processes = test_utils.start_tpce()
131 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
132 ('roadma', cls.NODE_VERSION),
133 ('roadmc', cls.NODE_VERSION),
134 ('spdrc', cls.NODE_VERSION)])
137 def tearDownClass(cls):
138 # pylint: disable=not-an-iterable
139 for process in cls.processes:
140 test_utils.shutdown_process(process)
141 print("all processes killed")
146 def test_01_connect_spdrA(self):
147 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
148 self.assertEqual(response.status_code,
149 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_02_connect_spdrC(self):
152 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
153 self.assertEqual(response.status_code,
154 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_03_connect_rdmA(self):
157 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
158 self.assertEqual(response.status_code,
159 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161 def test_04_connect_rdmC(self):
162 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
163 self.assertEqual(response.status_code,
164 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
166 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
167 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
168 "ROADM-A1", "1", "SRG1-PP1-TXRX")
169 self.assertEqual(response.status_code, requests.codes.ok)
170 res = response.json()
171 self.assertIn('Xponder Roadm Link created successfully',
172 res["output"]["result"])
175 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
176 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
177 "ROADM-A1", "1", "SRG1-PP1-TXRX")
178 self.assertEqual(response.status_code, requests.codes.ok)
179 res = response.json()
180 self.assertIn('Roadm Xponder links created successfully',
181 res["output"]["result"])
184 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
185 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
186 "ROADM-C1", "1", "SRG1-PP1-TXRX")
187 self.assertEqual(response.status_code, requests.codes.ok)
188 res = response.json()
189 self.assertIn('Xponder Roadm Link created successfully',
190 res["output"]["result"])
193 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
194 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
195 "ROADM-C1", "1", "SRG1-PP1-TXRX")
196 self.assertEqual(response.status_code, requests.codes.ok)
197 res = response.json()
198 self.assertIn('Roadm Xponder links created successfully',
199 res["output"]["result"])
202 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
203 # Config ROADMA-ROADMC oms-attributes
205 "auto-spanloss": "true",
206 "spanloss-base": 11.4,
207 "spanloss-current": 12,
208 "engineered-spanloss": 12.2,
209 "link-concatenation": [{
212 "SRLG-length": 100000,
214 response = test_utils.add_oms_attr_request(
215 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
216 self.assertEqual(response.status_code, requests.codes.created)
218 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
219 # Config ROADMC-ROADMA oms-attributes
221 "auto-spanloss": "true",
222 "spanloss-base": 11.4,
223 "spanloss-current": 12,
224 "engineered-spanloss": 12.2,
225 "link-concatenation": [{
228 "SRLG-length": 100000,
230 response = test_utils.add_oms_attr_request(
231 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
232 self.assertEqual(response.status_code, requests.codes.created)
234 # test service-create for OCH-OTU4 service from spdr to spdr
235 def test_11_check_otn_topology(self):
236 response = test_utils.get_otn_topo_request()
237 self.assertEqual(response.status_code, requests.codes.ok)
238 res = response.json()
239 nbNode = len(res['network'][0]['node'])
240 self.assertEqual(nbNode, 6, 'There should be 6 nodes')
241 self.assertNotIn('ietf-network-topology:link', res['network'][0])
243 def test_12_create_OCH_OTU4_service(self):
244 response = test_utils.service_create_request(self.cr_serv_sample_data)
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
247 self.assertIn('PCE calculation in progress',
248 res['output']['configuration-response-common']['response-message'])
249 time.sleep(self.WAITING)
251 def test_13_get_OCH_OTU4_service1(self):
252 response = test_utils.get_service_list_request(
253 "services/service1-OCH-OTU4")
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
257 res['services'][0]['administrative-state'], 'inService')
259 res['services'][0]['service-name'], 'service1-OCH-OTU4')
261 res['services'][0]['connection-type'], 'infrastructure')
263 res['services'][0]['lifecycle-state'], 'planned')
266 # Check correct configuration of devices
267 def test_14_check_interface_och_spdra(self):
268 response = test_utils.check_netconf_node_request(
269 "SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
270 self.assertEqual(response.status_code, requests.codes.ok)
271 res = response.json()
272 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
273 'administrative-state': 'inService',
274 'supporting-circuit-pack-name': 'CP1-CFP0',
275 'type': 'org-openroadm-interfaces:opticalChannel',
276 'supporting-port': 'CP1-CFP0-P1'
277 }, **res['interface'][0]),
280 self.assertDictEqual(
281 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
282 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
283 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
285 def test_15_check_interface_OTU4_spdra(self):
286 response = test_utils.check_netconf_node_request(
287 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
288 self.assertEqual(response.status_code, requests.codes.ok)
289 res = response.json()
290 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
291 'administrative-state': 'inService',
292 'supporting-circuit-pack-name': 'CP1-CFP0',
293 'supporting-interface': 'XPDR1-NETWORK1-761:768',
294 'type': 'org-openroadm-interfaces:otnOtu',
295 'supporting-port': 'CP1-CFP0-P1'
297 input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
298 'expected-dapi': 'H/OelLynehI=',
299 'tx-dapi': 'AMf1n5hK6Xkk',
300 'expected-sapi': 'AMf1n5hK6Xkk',
301 'rate': 'org-openroadm-otn-common-types:OTU4',
304 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
307 self.assertDictEqual(input_dict_2,
309 ['org-openroadm-otn-otu-interfaces:otu'])
311 def test_16_check_interface_och_spdrc(self):
312 response = test_utils.check_netconf_node_request(
313 "SPDR-SC1", "interface/XPDR1-NETWORK1-761:768")
314 self.assertEqual(response.status_code, requests.codes.ok)
315 res = response.json()
316 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
317 'administrative-state': 'inService',
318 'supporting-circuit-pack-name': 'CP1-CFP0',
319 'type': 'org-openroadm-interfaces:opticalChannel',
320 'supporting-port': 'CP1-CFP0-P1'
321 }, **res['interface'][0]),
324 self.assertDictEqual(
325 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
326 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
327 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
329 def test_17_check_interface_OTU4_spdrc(self):
330 response = test_utils.check_netconf_node_request(
331 "SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
334 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
335 'administrative-state': 'inService',
336 'supporting-circuit-pack-name': 'CP1-CFP0',
337 'supporting-interface': 'XPDR1-NETWORK1-761:768',
338 'type': 'org-openroadm-interfaces:otnOtu',
339 'supporting-port': 'CP1-CFP0-P1'
341 input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
342 'expected-sapi': 'H/OelLynehI=',
343 'tx-sapi': 'AMf1n5hK6Xkk',
344 'expected-dapi': 'AMf1n5hK6Xkk',
345 'rate': 'org-openroadm-otn-common-types:OTU4',
349 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
352 self.assertDictEqual(input_dict_2,
354 ['org-openroadm-otn-otu-interfaces:otu'])
356 def test_18_check_no_interface_ODU4_spdra(self):
357 response = test_utils.check_netconf_node_request(
358 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
359 self.assertEqual(response.status_code, requests.codes.conflict)
360 res = response.json()
362 {"error-type": "application", "error-tag": "data-missing",
363 "error-message": "Request could not be completed because the relevant data model content does not exist"},
364 res['errors']['error'])
366 def test_19_check_openroadm_topo_spdra(self):
367 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
368 self.assertEqual(response.status_code, requests.codes.ok)
369 res = response.json()
370 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
371 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
372 self.assertEqual({'frequency': 196.1,
374 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
377 def test_20_check_openroadm_topo_ROADMA_SRG(self):
378 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
379 self.assertEqual(response.status_code, requests.codes.ok)
380 res = response.json()
381 freq_map = base64.b64decode(
382 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
383 freq_map_array = [int(x) for x in freq_map]
384 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
385 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
387 if ele['tp-id'] == 'SRG1-PP1-TXRX':
388 freq_map = base64.b64decode(
389 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
390 freq_map_array = [int(x) for x in freq_map]
391 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
392 if ele['tp-id'] == 'SRG1-PP2-TXRX':
393 self.assertNotIn('avail-freq-maps', dict.keys(ele))
396 def test_21_check_openroadm_topo_ROADMA_DEG(self):
397 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
398 self.assertEqual(response.status_code, requests.codes.ok)
399 res = response.json()
400 freq_map = base64.b64decode(
401 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
402 freq_map_array = [int(x) for x in freq_map]
403 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
404 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
406 if ele['tp-id'] == 'DEG2-CTP-TXRX':
407 freq_map = base64.b64decode(
408 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
409 freq_map_array = [int(x) for x in freq_map]
410 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
411 if ele['tp-id'] == 'DEG2-TTP-TXRX':
412 freq_map = base64.b64decode(
413 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
414 freq_map_array = [int(x) for x in freq_map]
415 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
418 def test_22_check_otn_topo_otu4_links(self):
419 response = test_utils.get_otn_topo_request()
420 self.assertEqual(response.status_code, requests.codes.ok)
421 res = response.json()
422 nb_links = len(res['network'][0]['ietf-network-topology:link'])
423 self.assertEqual(nb_links, 2)
424 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
425 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
426 for link in res['network'][0]['ietf-network-topology:link']:
427 self.assertIn(link['link-id'], listLinkId)
429 link['transportpce-topology:otn-link-type'], 'OTU4')
431 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
433 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
435 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
437 link['org-openroadm-common-network:opposite-link'], listLinkId)
439 # test service-create for ODU4 service from spdr to spdr
440 def test_23_create_ODU4_service(self):
441 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
442 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
443 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
444 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
445 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
446 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
447 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
449 response = test_utils.service_create_request(self.cr_serv_sample_data)
450 self.assertEqual(response.status_code, requests.codes.ok)
451 res = response.json()
452 self.assertIn('PCE calculation in progress',
453 res['output']['configuration-response-common']['response-message'])
454 time.sleep(self.WAITING)
456 def test_24_get_ODU4_service1(self):
457 response = test_utils.get_service_list_request(
458 "services/service1-ODU4")
459 self.assertEqual(response.status_code, requests.codes.ok)
460 res = response.json()
462 res['services'][0]['administrative-state'], 'inService')
464 res['services'][0]['service-name'], 'service1-ODU4')
466 res['services'][0]['connection-type'], 'infrastructure')
468 res['services'][0]['lifecycle-state'], 'planned')
471 def test_25_check_interface_ODU4_spdra(self):
472 response = test_utils.check_netconf_node_request(
473 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
474 self.assertEqual(response.status_code, requests.codes.ok)
475 res = response.json()
476 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
477 'administrative-state': 'inService',
478 'supporting-circuit-pack-name': 'CP1-CFP0',
479 'supporting-interface': 'XPDR1-NETWORK1-OTU',
480 'type': 'org-openroadm-interfaces:otnOdu',
481 'supporting-port': 'CP1-CFP0-P1',
483 'description': 'TBD'}
484 # SAPI/DAPI are added in the Otu4 renderer
485 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
486 'rate': 'org-openroadm-otn-common-types:ODU4',
487 'monitoring-mode': 'terminated',
488 'expected-dapi': 'H/OelLynehI=',
489 'expected-sapi': 'AMf1n5hK6Xkk',
490 'tx-dapi': 'AMf1n5hK6Xkk',
491 'tx-sapi': 'H/OelLynehI='}
493 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
495 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
497 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
499 self.assertDictEqual(
500 {'payload-type': '21', 'exp-payload-type': '21'},
501 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
503 def test_26_check_interface_ODU4_spdrc(self):
504 response = test_utils.check_netconf_node_request(
505 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
506 self.assertEqual(response.status_code, requests.codes.ok)
507 res = response.json()
508 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
509 'administrative-state': 'inService',
510 'supporting-circuit-pack-name': 'CP1-CFP0',
511 'supporting-interface': 'XPDR1-NETWORK1-OTU',
512 'type': 'org-openroadm-interfaces:otnOdu',
513 'supporting-port': 'CP1-CFP0-P1',
515 'description': 'TBD'}
516 # SAPI/DAPI are added in the Otu4 renderer
517 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
518 'rate': 'org-openroadm-otn-common-types:ODU4',
519 'monitoring-mode': 'terminated',
520 'tx-sapi': 'AMf1n5hK6Xkk',
521 'tx-dapi': 'H/OelLynehI=',
522 'expected-sapi': 'H/OelLynehI=',
523 'expected-dapi': 'AMf1n5hK6Xkk'
525 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
527 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
529 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
531 self.assertDictEqual(
532 {'payload-type': '21', 'exp-payload-type': '21'},
533 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
535 def test_27_check_otn_topo_links(self):
536 response = test_utils.get_otn_topo_request()
537 self.assertEqual(response.status_code, requests.codes.ok)
538 res = response.json()
539 nb_links = len(res['network'][0]['ietf-network-topology:link'])
540 self.assertEqual(nb_links, 4)
541 for link in res['network'][0]['ietf-network-topology:link']:
542 linkId = link['link-id']
543 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
544 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
546 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
548 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
549 elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
550 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
552 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
554 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
556 link['transportpce-topology:otn-link-type'], 'ODTU4')
558 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
559 self.assertIn(link['org-openroadm-common-network:opposite-link'],
560 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
561 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
563 self.fail("this link should not exist")
565 def test_28_check_otn_topo_tp(self):
566 response = test_utils.get_otn_topo_request()
567 res = response.json()
568 for node in res['network'][0]['node']:
569 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
570 tpList = node['ietf-network-topology:termination-point']
572 if tp['tp-id'] == 'XPDR1-NETWORK1':
573 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
574 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
576 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
577 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
578 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
580 # test service-create for 10GE service from spdr to spdr
581 def test_29_create_10GE_service(self):
582 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
583 self.cr_serv_sample_data["input"]["connection-type"] = "service"
584 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
585 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
586 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
587 self.cr_serv_sample_data["input"]["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
588 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
589 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
590 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
591 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
592 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
593 self.cr_serv_sample_data["input"]["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
594 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
595 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
596 response = test_utils.service_create_request(self.cr_serv_sample_data)
597 self.assertEqual(response.status_code, requests.codes.ok)
598 res = response.json()
599 self.assertIn('PCE calculation in progress',
600 res['output']['configuration-response-common']['response-message'])
601 time.sleep(self.WAITING)
603 def test_30_get_10GE_service1(self):
604 response = test_utils.get_service_list_request(
605 "services/service1-10GE")
606 self.assertEqual(response.status_code, requests.codes.ok)
607 res = response.json()
609 res['services'][0]['administrative-state'], 'inService')
611 res['services'][0]['service-name'], 'service1-10GE')
613 res['services'][0]['connection-type'], 'service')
615 res['services'][0]['lifecycle-state'], 'planned')
618 def test_31_check_interface_10GE_CLIENT_spdra(self):
619 response = test_utils.check_netconf_node_request(
620 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
621 self.assertEqual(response.status_code, requests.codes.ok)
622 res = response.json()
623 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
624 'administrative-state': 'inService',
625 'supporting-circuit-pack-name': 'CP1-SFP4',
626 'type': 'org-openroadm-interfaces:ethernetCsmacd',
627 'supporting-port': 'CP1-SFP4-P1'
629 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
631 self.assertDictEqual(
633 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
635 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
636 response = test_utils.check_netconf_node_request(
637 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e")
638 self.assertEqual(response.status_code, requests.codes.ok)
639 res = response.json()
640 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
641 'administrative-state': 'inService',
642 'supporting-circuit-pack-name': 'CP1-SFP4',
643 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
644 'type': 'org-openroadm-interfaces:otnOdu',
645 'supporting-port': 'CP1-SFP4-P1'}
647 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
648 'rate': 'org-openroadm-otn-common-types:ODU2e',
649 'monitoring-mode': 'terminated'}
651 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
653 self.assertDictEqual(dict(input_dict_2,
654 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
655 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
656 self.assertDictEqual(
657 {'payload-type': '03', 'exp-payload-type': '03'},
658 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
660 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
661 response = test_utils.check_netconf_node_request(
662 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e")
663 self.assertEqual(response.status_code, requests.codes.ok)
664 res = response.json()
665 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
666 'administrative-state': 'inService',
667 'supporting-circuit-pack-name': 'CP1-CFP0',
668 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
669 'type': 'org-openroadm-interfaces:otnOdu',
670 'supporting-port': 'CP1-CFP0-P1'}
672 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
673 'rate': 'org-openroadm-otn-common-types:ODU2e',
674 'monitoring-mode': 'monitored'}
675 input_dict_3 = {'trib-port-number': 1}
677 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
679 self.assertDictEqual(dict(input_dict_2,
680 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
681 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
682 self.assertDictEqual(dict(input_dict_3,
683 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
684 'parent-odu-allocation']),
685 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
686 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
689 def test_34_check_ODU2E_connection_spdra(self):
690 response = test_utils.check_netconf_node_request(
692 "odu-connection/XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e")
693 self.assertEqual(response.status_code, requests.codes.ok)
694 res = response.json()
697 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
698 'direction': 'bidirectional'
701 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
702 res['odu-connection'][0])
703 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
704 res['odu-connection'][0]['destination'])
705 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
706 res['odu-connection'][0]['source'])
708 def test_35_check_interface_10GE_CLIENT_spdrc(self):
709 response = test_utils.check_netconf_node_request(
710 "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
711 self.assertEqual(response.status_code, requests.codes.ok)
712 res = response.json()
713 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
714 'administrative-state': 'inService',
715 'supporting-circuit-pack-name': 'CP1-SFP4',
716 'type': 'org-openroadm-interfaces:ethernetCsmacd',
717 'supporting-port': 'CP1-SFP4-P1'
719 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
721 self.assertDictEqual(
723 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
725 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
726 response = test_utils.check_netconf_node_request(
727 "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e")
728 self.assertEqual(response.status_code, requests.codes.ok)
729 res = response.json()
730 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
731 'administrative-state': 'inService',
732 'supporting-circuit-pack-name': 'CP1-SFP4',
733 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
734 'type': 'org-openroadm-interfaces:otnOdu',
735 'supporting-port': 'CP1-SFP4-P1'}
737 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
738 'rate': 'org-openroadm-otn-common-types:ODU2e',
739 'monitoring-mode': 'terminated'}
741 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
743 self.assertDictEqual(dict(input_dict_2,
744 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
745 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
746 self.assertDictEqual(
747 {'payload-type': '03', 'exp-payload-type': '03'},
748 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
750 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
751 response = test_utils.check_netconf_node_request(
752 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e")
753 self.assertEqual(response.status_code, requests.codes.ok)
754 res = response.json()
755 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
756 'administrative-state': 'inService',
757 'supporting-circuit-pack-name': 'CP1-CFP0',
758 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
759 'type': 'org-openroadm-interfaces:otnOdu',
760 'supporting-port': 'CP1-CFP0-P1'}
762 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
763 'rate': 'org-openroadm-otn-common-types:ODU2e',
764 'monitoring-mode': 'monitored'}
766 input_dict_3 = {'trib-port-number': 1}
768 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
770 self.assertDictEqual(dict(input_dict_2,
771 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
772 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
773 self.assertDictEqual(dict(input_dict_3,
774 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
775 'parent-odu-allocation']),
776 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
777 'parent-odu-allocation'])
780 'org-openroadm-otn-odu-interfaces:odu'][
781 'parent-odu-allocation']['trib-slots'])
783 def test_38_check_ODU2E_connection_spdrc(self):
784 response = test_utils.check_netconf_node_request(
786 "odu-connection/XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e")
787 self.assertEqual(response.status_code, requests.codes.ok)
788 res = response.json()
791 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
792 'direction': 'bidirectional'
795 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
796 res['odu-connection'][0])
797 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
798 res['odu-connection'][0]['destination'])
799 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
800 res['odu-connection'][0]['source'])
802 def test_39_check_otn_topo_links(self):
803 response = test_utils.get_otn_topo_request()
804 self.assertEqual(response.status_code, requests.codes.ok)
805 res = response.json()
806 nb_links = len(res['network'][0]['ietf-network-topology:link'])
807 self.assertEqual(nb_links, 4)
808 for link in res['network'][0]['ietf-network-topology:link']:
809 linkId = link['link-id']
810 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
811 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
813 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
815 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
817 def test_40_check_otn_topo_tp(self):
818 response = test_utils.get_otn_topo_request()
819 res = response.json()
820 for node in res['network'][0]['node']:
821 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
822 tpList = node['ietf-network-topology:termination-point']
824 if tp['tp-id'] == 'XPDR1-NETWORK1':
825 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
826 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
827 tsPoolList = list(range(1, 9))
829 tsPoolList, xpdrTpPortConAt['ts-pool'])
831 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
833 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
835 def test_41_delete_10GE_service(self):
836 response = test_utils.service_delete_request("service1-10GE")
837 self.assertEqual(response.status_code, requests.codes.ok)
838 res = response.json()
839 self.assertIn('Renderer service delete in progress',
840 res['output']['configuration-response-common']['response-message'])
841 time.sleep(self.WAITING)
843 def test_42_check_service_list(self):
844 response = test_utils.get_service_list_request("")
845 self.assertEqual(response.status_code, requests.codes.ok)
846 res = response.json()
847 self.assertEqual(len(res['service-list']['services']), 2)
850 def test_43_check_no_ODU2e_connection_spdra(self):
851 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
852 self.assertEqual(response.status_code, requests.codes.ok)
853 res = response.json()
854 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
857 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
858 response = test_utils.check_netconf_node_request(
859 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
860 self.assertEqual(response.status_code, requests.codes.conflict)
862 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
863 response = test_utils.check_netconf_node_request(
864 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
865 self.assertEqual(response.status_code, requests.codes.conflict)
867 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
868 response = test_utils.check_netconf_node_request(
869 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
870 self.assertEqual(response.status_code, requests.codes.conflict)
872 def test_47_check_otn_topo_links(self):
873 response = test_utils.get_otn_topo_request()
874 self.assertEqual(response.status_code, requests.codes.ok)
875 res = response.json()
876 nb_links = len(res['network'][0]['ietf-network-topology:link'])
877 self.assertEqual(nb_links, 4)
878 for link in res['network'][0]['ietf-network-topology:link']:
879 linkId = link['link-id']
880 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
881 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
883 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
885 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
887 def test_48_check_otn_topo_tp(self):
888 response = test_utils.get_otn_topo_request()
889 res = response.json()
890 for node in res['network'][0]['node']:
891 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
892 tpList = node['ietf-network-topology:termination-point']
894 if tp['tp-id'] == 'XPDR1-NETWORK1':
895 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
896 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
898 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
900 def test_49_delete_ODU4_service(self):
901 response = test_utils.service_delete_request("service1-ODU4")
902 self.assertEqual(response.status_code, requests.codes.ok)
903 res = response.json()
904 self.assertIn('Renderer service delete in progress',
905 res['output']['configuration-response-common']['response-message'])
906 time.sleep(self.WAITING)
908 def test_50_check_service_list(self):
909 response = test_utils.get_service_list_request("")
910 self.assertEqual(response.status_code, requests.codes.ok)
911 res = response.json()
912 self.assertEqual(len(res['service-list']['services']), 1)
915 def test_51_check_no_interface_ODU4_spdra(self):
916 response = test_utils.check_netconf_node_request(
917 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
918 self.assertEqual(response.status_code, requests.codes.conflict)
920 def test_52_check_otn_topo_links(self):
921 self.test_22_check_otn_topo_otu4_links()
923 def test_53_check_otn_topo_tp(self):
924 response = test_utils.get_otn_topo_request()
925 res = response.json()
926 for node in res['network'][0]['node']:
927 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
928 tpList = node['ietf-network-topology:termination-point']
930 if tp['tp-id'] == 'XPDR1-NETWORK1':
931 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
932 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
934 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
936 def test_54_delete_OCH_OTU4_service(self):
937 response = test_utils.service_delete_request("service1-OCH-OTU4")
938 self.assertEqual(response.status_code, requests.codes.ok)
939 res = response.json()
940 self.assertIn('Renderer service delete in progress',
941 res['output']['configuration-response-common']['response-message'])
942 time.sleep(self.WAITING)
944 def test_55_get_no_service(self):
945 response = test_utils.get_service_list_request("")
946 self.assertEqual(response.status_code, requests.codes.conflict)
947 res = response.json()
949 {"error-type": "application", "error-tag": "data-missing",
950 "error-message": "Request could not be completed because the relevant data model content does not exist"},
951 res['errors']['error'])
954 def test_56_check_no_interface_OTU4_spdra(self):
955 response = test_utils.check_netconf_node_request(
956 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
957 self.assertEqual(response.status_code, requests.codes.conflict)
959 def test_57_check_no_interface_OCH_spdra(self):
960 response = test_utils.check_netconf_node_request(
961 "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
962 self.assertEqual(response.status_code, requests.codes.conflict)
964 def test_58_getLinks_OtnTopology(self):
965 response = test_utils.get_otn_topo_request()
966 self.assertEqual(response.status_code, requests.codes.ok)
967 res = response.json()
968 self.assertNotIn('ietf-network-topology:link', res['network'][0])
970 def test_59_check_openroadm_topo_spdra(self):
971 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
972 self.assertEqual(response.status_code, requests.codes.ok)
973 res = response.json()
974 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
975 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
976 self.assertNotIn('wavelength', dict.keys(
977 tp['org-openroadm-network-topology:xpdr-network-attributes']))
980 def test_60_check_openroadm_topo_ROADMA_SRG(self):
981 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
982 self.assertEqual(response.status_code, requests.codes.ok)
983 res = response.json()
984 freq_map = base64.b64decode(
985 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
986 freq_map_array = [int(x) for x in freq_map]
987 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
988 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
990 if ele['tp-id'] == 'SRG1-PP1-TXRX':
991 freq_map = base64.b64decode(
992 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
993 freq_map_array = [int(x) for x in freq_map]
994 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
997 def test_61_check_openroadm_topo_ROADMA_DEG(self):
998 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
999 self.assertEqual(response.status_code, requests.codes.ok)
1000 res = response.json()
1001 freq_map = base64.b64decode(
1002 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1003 freq_map_array = [int(x) for x in freq_map]
1004 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1005 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1006 for ele in liste_tp:
1007 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1008 freq_map = base64.b64decode(
1009 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1010 freq_map_array = [int(x) for x in freq_map]
1011 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1012 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1013 freq_map = base64.b64decode(
1014 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1015 freq_map_array = [int(x) for x in freq_map]
1016 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1019 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
1020 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "3", "1",
1021 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1022 self.assertEqual(response.status_code, requests.codes.ok)
1023 res = response.json()
1024 self.assertIn('Xponder Roadm Link created successfully',
1025 res["output"]["result"])
1028 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
1029 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "3", "1",
1030 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1031 self.assertEqual(response.status_code, requests.codes.ok)
1032 res = response.json()
1033 self.assertIn('Roadm Xponder links created successfully',
1034 res["output"]["result"])
1037 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1038 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "3", "1",
1039 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1040 self.assertEqual(response.status_code, requests.codes.ok)
1041 res = response.json()
1042 self.assertIn('Xponder Roadm Link created successfully',
1043 res["output"]["result"])
1046 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1047 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "3", "1",
1048 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1049 self.assertEqual(response.status_code, requests.codes.ok)
1050 res = response.json()
1051 self.assertIn('Roadm Xponder links created successfully',
1052 res["output"]["result"])
1055 def test_66_create_OCH_OTU4_service_2(self):
1056 # pylint: disable=line-too-long
1057 self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1058 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1059 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1060 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1061 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1062 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1063 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1064 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1065 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1066 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1067 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1068 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1069 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1070 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1071 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1072 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1073 response = test_utils.service_create_request(self.cr_serv_sample_data)
1074 self.assertEqual(response.status_code, requests.codes.ok)
1075 res = response.json()
1076 self.assertIn('PCE calculation in progress',
1077 res['output']['configuration-response-common']['response-message'])
1078 time.sleep(self.WAITING)
1080 def test_67_get_OCH_OTU4_service2(self):
1081 response = test_utils.get_service_list_request(
1082 "services/service2-OCH-OTU4")
1083 self.assertEqual(response.status_code, requests.codes.ok)
1084 res = response.json()
1086 res['services'][0]['administrative-state'], 'inService')
1088 res['services'][0]['service-name'], 'service2-OCH-OTU4')
1090 res['services'][0]['connection-type'], 'infrastructure')
1092 res['services'][0]['lifecycle-state'], 'planned')
1095 def test_68_create_ODU4_service_2(self):
1096 # pylint: disable=line-too-long
1097 self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1098 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1099 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1100 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1101 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1102 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1103 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1104 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1105 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1106 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1107 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1108 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1109 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1110 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1111 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1113 response = test_utils.service_create_request(self.cr_serv_sample_data)
1114 self.assertEqual(response.status_code, requests.codes.ok)
1115 res = response.json()
1116 self.assertIn('PCE calculation in progress',
1117 res['output']['configuration-response-common']['response-message'])
1118 time.sleep(self.WAITING)
1120 def test_69_get_ODU4_service2(self):
1121 response = test_utils.get_service_list_request(
1122 "services/service2-ODU4")
1123 self.assertEqual(response.status_code, requests.codes.ok)
1124 res = response.json()
1126 res['services'][0]['administrative-state'], 'inService')
1128 res['services'][0]['service-name'], 'service2-ODU4')
1130 res['services'][0]['connection-type'], 'infrastructure')
1132 res['services'][0]['lifecycle-state'], 'planned')
1135 def test_70_create_1GE_service(self):
1136 # pylint: disable=line-too-long
1137 self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1138 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1139 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1140 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1141 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1142 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1143 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1144 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1145 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1146 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1147 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1148 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1149 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1150 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1151 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1152 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1153 response = test_utils.service_create_request(self.cr_serv_sample_data)
1154 self.assertEqual(response.status_code, requests.codes.ok)
1155 res = response.json()
1156 self.assertIn('PCE calculation in progress',
1157 res['output']['configuration-response-common']['response-message'])
1158 time.sleep(self.WAITING)
1160 def test_71_get_1GE_service1(self):
1161 response = test_utils.get_service_list_request("services/service1-1GE")
1162 self.assertEqual(response.status_code, requests.codes.ok)
1163 res = response.json()
1165 res['services'][0]['administrative-state'], 'inService')
1167 res['services'][0]['service-name'], 'service1-1GE')
1169 res['services'][0]['connection-type'], 'service')
1171 res['services'][0]['lifecycle-state'], 'planned')
1174 def test_72_check_interface_1GE_CLIENT_spdra(self):
1175 response = test_utils.check_netconf_node_request(
1176 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1177 self.assertEqual(response.status_code, requests.codes.ok)
1178 res = response.json()
1179 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1180 'administrative-state': 'inService',
1181 'supporting-circuit-pack-name': 'CP1-SFP4',
1182 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1183 'supporting-port': 'CP1-SFP4-P1'
1185 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1186 res['interface'][0])
1187 self.assertDictEqual(
1189 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1191 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1192 response = test_utils.check_netconf_node_request(
1193 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0")
1194 self.assertEqual(response.status_code, requests.codes.ok)
1195 res = response.json()
1196 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1197 'administrative-state': 'inService',
1198 'supporting-circuit-pack-name': 'CP3-SFP1',
1199 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1200 'type': 'org-openroadm-interfaces:otnOdu',
1201 'supporting-port': 'CP3-SFP1-P1'}
1203 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1204 'rate': 'org-openroadm-otn-common-types:ODU0',
1205 'monitoring-mode': 'terminated'}
1207 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1208 res['interface'][0])
1209 self.assertDictEqual(dict(input_dict_2,
1210 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1211 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1212 self.assertDictEqual(
1213 {'payload-type': '07', 'exp-payload-type': '07'},
1214 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1216 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1217 response = test_utils.check_netconf_node_request(
1218 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0")
1219 self.assertEqual(response.status_code, requests.codes.ok)
1220 res = response.json()
1221 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1222 'administrative-state': 'inService',
1223 'supporting-circuit-pack-name': 'CP3-CFP0',
1224 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1225 'type': 'org-openroadm-interfaces:otnOdu',
1226 'supporting-port': 'CP3-CFP0-P1'}
1228 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1229 'rate': 'org-openroadm-otn-common-types:ODU0',
1230 'monitoring-mode': 'monitored'}
1231 input_dict_3 = {'trib-port-number': 1}
1233 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1234 res['interface'][0])
1235 self.assertDictEqual(dict(input_dict_2,
1236 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1237 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1238 self.assertDictEqual(dict(input_dict_3,
1239 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1240 'parent-odu-allocation']),
1241 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1242 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1245 def test_75_check_ODU0_connection_spdra(self):
1246 response = test_utils.check_netconf_node_request(
1248 "odu-connection/XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0")
1249 self.assertEqual(response.status_code, requests.codes.ok)
1250 res = response.json()
1253 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1254 'direction': 'bidirectional'
1257 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1258 res['odu-connection'][0])
1259 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1260 res['odu-connection'][0]['destination'])
1261 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1262 res['odu-connection'][0]['source'])
1264 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1265 response = test_utils.check_netconf_node_request(
1266 "SPDR-SC1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1267 self.assertEqual(response.status_code, requests.codes.ok)
1268 res = response.json()
1269 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1270 'administrative-state': 'inService',
1271 'supporting-circuit-pack-name': 'CP3-SFP1',
1272 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1273 'supporting-port': 'CP3-SFP1-P1'
1275 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1276 res['interface'][0])
1277 self.assertDictEqual(
1279 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1281 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1282 response = test_utils.check_netconf_node_request(
1283 "SPDR-SC1", "interface/XPDR3-CLIENT1-ODU0")
1284 self.assertEqual(response.status_code, requests.codes.ok)
1285 res = response.json()
1286 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1287 'administrative-state': 'inService',
1288 'supporting-circuit-pack-name': 'CP3-SFP1',
1289 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1290 'type': 'org-openroadm-interfaces:otnOdu',
1291 'supporting-port': 'CP3-SFP1-P1'}
1293 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1294 'rate': 'org-openroadm-otn-common-types:ODU0',
1295 'monitoring-mode': 'terminated'}
1297 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1298 res['interface'][0])
1299 self.assertDictEqual(dict(input_dict_2,
1300 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1301 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1302 self.assertDictEqual(
1303 {'payload-type': '07', 'exp-payload-type': '07'},
1304 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1306 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1307 response = test_utils.check_netconf_node_request(
1308 "SPDR-SC1", "interface/XPDR3-NETWORK1-ODU0")
1309 self.assertEqual(response.status_code, requests.codes.ok)
1310 res = response.json()
1311 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1312 'administrative-state': 'inService',
1313 'supporting-circuit-pack-name': 'CP3-CFP0',
1314 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1315 'type': 'org-openroadm-interfaces:otnOdu',
1316 'supporting-port': 'CP3-CFP0-P1'}
1318 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1319 'rate': 'org-openroadm-otn-common-types:ODU0',
1320 'monitoring-mode': 'monitored'}
1322 input_dict_3 = {'trib-port-number': 1}
1324 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1325 res['interface'][0])
1326 self.assertDictEqual(dict(input_dict_2,
1327 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1328 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1329 self.assertDictEqual(dict(input_dict_3,
1330 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1331 'parent-odu-allocation']),
1332 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1333 'parent-odu-allocation'])
1335 res['interface'][0][
1336 'org-openroadm-otn-odu-interfaces:odu'][
1337 'parent-odu-allocation']['trib-slots'])
1339 def test_79_check_ODU0_connection_spdrc(self):
1340 response = test_utils.check_netconf_node_request(
1342 "odu-connection/XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0")
1343 self.assertEqual(response.status_code, requests.codes.ok)
1344 res = response.json()
1347 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1348 'direction': 'bidirectional'
1351 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1352 res['odu-connection'][0])
1353 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1354 res['odu-connection'][0]['destination'])
1355 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1356 res['odu-connection'][0]['source'])
1358 def test_80_check_otn_topo_links(self):
1359 response = test_utils.get_otn_topo_request()
1360 self.assertEqual(response.status_code, requests.codes.ok)
1361 res = response.json()
1362 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1363 self.assertEqual(nb_links, 4)
1364 for link in res['network'][0]['ietf-network-topology:link']:
1365 linkId = link['link-id']
1366 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1367 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1369 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1371 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1373 def test_81_check_otn_topo_tp(self):
1374 response = test_utils.get_otn_topo_request()
1375 res = response.json()
1376 for node in res['network'][0]['node']:
1377 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1378 tpList = node['ietf-network-topology:termination-point']
1380 if tp['tp-id'] == 'XPDR3-NETWORK1':
1381 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1382 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1383 tsPoolList = list(range(1, 2))
1385 tsPoolList, xpdrTpPortConAt['ts-pool'])
1387 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1389 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1391 def test_82_delete_1GE_service(self):
1392 response = test_utils.service_delete_request("service1-1GE")
1393 self.assertEqual(response.status_code, requests.codes.ok)
1394 res = response.json()
1395 self.assertIn('Renderer service delete in progress',
1396 res['output']['configuration-response-common']['response-message'])
1397 time.sleep(self.WAITING)
1399 def test_83_check_service_list(self):
1400 response = test_utils.get_service_list_request("")
1401 self.assertEqual(response.status_code, requests.codes.ok)
1402 res = response.json()
1403 self.assertEqual(len(res['service-list']['services']), 2)
1406 def test_84_check_no_ODU0_connection_spdra(self):
1407 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1408 self.assertEqual(response.status_code, requests.codes.ok)
1409 res = response.json()
1410 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1413 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1414 response = test_utils.check_netconf_node_request(
1415 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1")
1416 self.assertEqual(response.status_code, requests.codes.conflict)
1418 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1419 response = test_utils.check_netconf_node_request(
1420 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1")
1421 self.assertEqual(response.status_code, requests.codes.conflict)
1423 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1424 response = test_utils.check_netconf_node_request(
1425 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1426 self.assertEqual(response.status_code, requests.codes.conflict)
1428 def test_88_check_otn_topo_links(self):
1429 response = test_utils.get_otn_topo_request()
1430 self.assertEqual(response.status_code, requests.codes.ok)
1431 res = response.json()
1432 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1433 self.assertEqual(nb_links, 4)
1434 for link in res['network'][0]['ietf-network-topology:link']:
1435 linkId = link['link-id']
1436 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1437 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1439 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1441 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1443 def test_89_check_otn_topo_tp(self):
1444 response = test_utils.get_otn_topo_request()
1445 res = response.json()
1446 for node in res['network'][0]['node']:
1447 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1448 tpList = node['ietf-network-topology:termination-point']
1450 if tp['tp-id'] == 'XPDR3-NETWORK1':
1451 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1452 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1454 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1456 def test_90_delete_ODU4_service(self):
1457 response = test_utils.service_delete_request("service2-ODU4")
1458 self.assertEqual(response.status_code, requests.codes.ok)
1459 res = response.json()
1460 self.assertIn('Renderer service delete in progress',
1461 res['output']['configuration-response-common']['response-message'])
1462 time.sleep(self.WAITING)
1464 def test_91_delete_OCH_OTU4_service(self):
1465 response = test_utils.service_delete_request("service2-OCH-OTU4")
1466 self.assertEqual(response.status_code, requests.codes.ok)
1467 res = response.json()
1468 self.assertIn('Renderer service delete in progress',
1469 res['output']['configuration-response-common']['response-message'])
1470 time.sleep(self.WAITING)
1472 def test_92_disconnect_xponders_from_roadm(self):
1473 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1474 response = test_utils.get_ordm_topo_request("")
1475 self.assertEqual(response.status_code, requests.codes.ok)
1476 res = response.json()
1477 links = res['network'][0]['ietf-network-topology:link']
1479 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1480 link_name = link["link-id"]
1481 response = test_utils.delete_request(url+link_name)
1482 self.assertEqual(response.status_code, requests.codes.ok)
1484 def test_93_check_openroadm_topology(self):
1485 response = test_utils.get_ordm_topo_request("")
1486 self.assertEqual(response.status_code, requests.codes.ok)
1487 res = response.json()
1488 links = res['network'][0]['ietf-network-topology:link']
1489 self.assertEqual(18, len(links), 'Topology should contain 18 links')
1491 def test_94_disconnect_spdrA(self):
1492 response = test_utils.unmount_device("SPDR-SA1")
1493 self.assertEqual(response.status_code, requests.codes.ok,
1494 test_utils.CODE_SHOULD_BE_200)
1496 def test_95_disconnect_spdrC(self):
1497 response = test_utils.unmount_device("SPDR-SC1")
1498 self.assertEqual(response.status_code, requests.codes.ok,
1499 test_utils.CODE_SHOULD_BE_200)
1501 def test_96_disconnect_roadmA(self):
1502 response = test_utils.unmount_device("ROADM-A1")
1503 self.assertEqual(response.status_code, requests.codes.ok,
1504 test_utils.CODE_SHOULD_BE_200)
1506 def test_97_disconnect_roadmC(self):
1507 response = test_utils.unmount_device("ROADM-C1")
1508 self.assertEqual(response.status_code, requests.codes.ok,
1509 test_utils.CODE_SHOULD_BE_200)
1512 if __name__ == "__main__":
1513 unittest.main(verbosity=2)