3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
20 sys.path.append('transportpce_tests/common/')
21 import test_utils # nopep8
24 class TransportPCEtesting(unittest.TestCase):
27 WAITING = 20 # nominal value is 300
28 NODE_VERSION = '2.2.1'
30 cr_serv_sample_data = {"input": {
31 "sdnc-request-header": {
32 "request-id": "request-1",
33 "rpc-action": "service-create",
34 "request-system-id": "appname"
36 "service-name": "service-OCH-OTU4-AB",
37 "common-id": "commonId",
38 "connection-type": "infrastructure",
40 "service-rate": "100",
41 "node-id": "SPDR-SA1",
42 "service-format": "OTU",
43 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
47 "committed-info-rate": "100000",
48 "committed-burst-size": "64"
53 "port-device-name": "SPDR-SA1-XPDR2",
55 "port-name": "XPDR2-NETWORK1",
56 "port-rack": "000000.00",
57 "port-shelf": "Chassis#1"
60 "lgx-device-name": "Some lgx-device-name",
61 "lgx-port-name": "Some lgx-port-name",
62 "lgx-port-rack": "000000.00",
63 "lgx-port-shelf": "00"
68 "port-device-name": "SPDR-SA1-XPDR2",
70 "port-name": "XPDR2-NETWORK1",
71 "port-rack": "000000.00",
72 "port-shelf": "Chassis#1"
75 "lgx-device-name": "Some lgx-device-name",
76 "lgx-port-name": "Some lgx-port-name",
77 "lgx-port-rack": "000000.00",
78 "lgx-port-shelf": "00"
84 "service-rate": "100",
85 "node-id": "SPDR-SB1",
86 "service-format": "OTU",
87 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
91 "committed-info-rate": "100000",
92 "committed-burst-size": "64"
97 "port-device-name": "SPDR-SB1-XPDR2",
99 "port-name": "XPDR2-NETWORK1",
100 "port-rack": "000000.00",
101 "port-shelf": "Chassis#1"
104 "lgx-device-name": "Some lgx-device-name",
105 "lgx-port-name": "Some lgx-port-name",
106 "lgx-port-rack": "000000.00",
107 "lgx-port-shelf": "00"
112 "port-device-name": "SPDR-SB1-XPDR2",
113 "port-type": "fixed",
114 "port-name": "XPDR2-NETWORK1",
115 "port-rack": "000000.00",
116 "port-shelf": "Chassis#1"
119 "lgx-device-name": "Some lgx-device-name",
120 "lgx-port-name": "Some lgx-port-name",
121 "lgx-port-rack": "000000.00",
122 "lgx-port-shelf": "00"
127 "due-date": "2018-06-15T00:00:01Z",
128 "operator-contact": "pw1234"
134 cls.processes = test_utils.start_tpce()
135 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
136 ('spdrb', cls.NODE_VERSION),
137 ('spdrc', cls.NODE_VERSION),
138 ('roadma', cls.NODE_VERSION),
139 ('roadmb', cls.NODE_VERSION),
140 ('roadmc', cls.NODE_VERSION)])
143 def tearDownClass(cls):
144 # pylint: disable=not-an-iterable
145 for process in cls.processes:
146 test_utils.shutdown_process(process)
147 print("all processes killed")
152 def test_01_connect_spdrA(self):
153 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
154 self.assertEqual(response.status_code,
155 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
157 def test_02_connect_spdrB(self):
158 response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
159 self.assertEqual(response.status_code,
160 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
162 def test_03_connect_spdrC(self):
163 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
164 self.assertEqual(response.status_code,
165 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
167 def test_04_connect_rdmA(self):
168 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
169 self.assertEqual(response.status_code,
170 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
172 def test_05_connect_rdmB(self):
173 response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
174 self.assertEqual(response.status_code,
175 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
177 def test_06_connect_rdmC(self):
178 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
179 self.assertEqual(response.status_code,
180 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
182 def test_07_connect_sprdA_2_N1_to_roadmA_PP3(self):
183 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "2", "1",
184 "ROADM-A1", "1", "SRG1-PP3-TXRX")
185 self.assertEqual(response.status_code, requests.codes.ok)
186 res = response.json()
187 self.assertIn('Xponder Roadm Link created successfully',
188 res["output"]["result"])
191 def test_08_connect_roadmA_PP3_to_spdrA_2_N1(self):
192 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "2", "1",
193 "ROADM-A1", "1", "SRG1-PP3-TXRX")
194 self.assertEqual(response.status_code, requests.codes.ok)
195 res = response.json()
196 self.assertIn('Roadm Xponder links created successfully',
197 res["output"]["result"])
200 def test_09_connect_sprdC_2_N1_to_roadmC_PP3(self):
201 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "2", "1",
202 "ROADM-C1", "1", "SRG1-PP3-TXRX")
203 self.assertEqual(response.status_code, requests.codes.ok)
204 res = response.json()
205 self.assertIn('Xponder Roadm Link created successfully',
206 res["output"]["result"])
209 def test_10_connect_roadmC_PP3_to_spdrC_2_N1(self):
210 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "2", "1",
211 "ROADM-C1", "1", "SRG1-PP3-TXRX")
212 self.assertEqual(response.status_code, requests.codes.ok)
213 res = response.json()
214 self.assertIn('Roadm Xponder links created successfully',
215 res["output"]["result"])
218 def test_11_connect_sprdB_2_N1_to_roadmB_PP1(self):
219 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1",
220 "ROADM-B1", "1", "SRG1-PP1-TXRX")
221 self.assertEqual(response.status_code, requests.codes.ok)
222 res = response.json()
223 self.assertIn('Xponder Roadm Link created successfully',
224 res["output"]["result"])
227 def test_12_connect_roadmB_PP1_to_spdrB_2_N1(self):
228 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1",
229 "ROADM-B1", "1", "SRG1-PP1-TXRX")
230 self.assertEqual(response.status_code, requests.codes.ok)
231 res = response.json()
232 self.assertIn('Roadm Xponder links created successfully',
233 res["output"]["result"])
236 def test_13_connect_sprdB_2_N2_to_roadmB_PP2(self):
237 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2",
238 "ROADM-B1", "1", "SRG1-PP2-TXRX")
239 self.assertEqual(response.status_code, requests.codes.ok)
240 res = response.json()
241 self.assertIn('Xponder Roadm Link created successfully',
242 res["output"]["result"])
245 def test_14_connect_roadmB_PP2_to_spdrB_2_N2(self):
246 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2",
247 "ROADM-B1", "1", "SRG1-PP2-TXRX")
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
250 self.assertIn('Roadm Xponder links created successfully',
251 res["output"]["result"])
254 def test_15_add_omsAttributes_ROADMA_ROADMB(self):
255 # Config ROADMA-ROADMB oms-attributes
257 "auto-spanloss": "true",
258 "spanloss-base": 11.4,
259 "spanloss-current": 12,
260 "engineered-spanloss": 12.2,
261 "link-concatenation": [{
264 "SRLG-length": 100000,
266 response = test_utils.add_oms_attr_request(
267 "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
268 self.assertEqual(response.status_code, requests.codes.created)
270 def test_16_add_omsAttributes_ROADMB_ROADMA(self):
271 # Config ROADMB-ROADMA oms-attributes
273 "auto-spanloss": "true",
274 "spanloss-base": 11.4,
275 "spanloss-current": 12,
276 "engineered-spanloss": 12.2,
277 "link-concatenation": [{
280 "SRLG-length": 100000,
282 response = test_utils.add_oms_attr_request(
283 "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
284 self.assertEqual(response.status_code, requests.codes.created)
286 def test_17_add_omsAttributes_ROADMB_ROADMC(self):
287 # Config ROADMB-ROADMC oms-attributes
289 "auto-spanloss": "true",
290 "spanloss-base": 11.4,
291 "spanloss-current": 12,
292 "engineered-spanloss": 12.2,
293 "link-concatenation": [{
296 "SRLG-length": 100000,
298 response = test_utils.add_oms_attr_request(
299 "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
300 self.assertEqual(response.status_code, requests.codes.created)
302 def test_18_add_omsAttributes_ROADMC_ROADMB(self):
303 # Config ROADMC-ROADMB oms-attributes
305 "auto-spanloss": "true",
306 "spanloss-base": 11.4,
307 "spanloss-current": 12,
308 "engineered-spanloss": 12.2,
309 "link-concatenation": [{
312 "SRLG-length": 100000,
314 response = test_utils.add_oms_attr_request(
315 "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
316 self.assertEqual(response.status_code, requests.codes.created)
318 def test_19_create_OTS_ROADMA_DEG1(self):
319 response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
321 self.assertEqual(response.status_code, requests.codes.ok)
322 res = response.json()
323 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
324 res["output"]["result"])
326 def test_20_create_OTS_ROADMB_DEG1(self):
327 response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX")
329 self.assertEqual(response.status_code, requests.codes.ok)
330 res = response.json()
331 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
332 res["output"]["result"])
334 def test_21_create_OTS_ROADMB_DEG2(self):
335 response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX")
337 self.assertEqual(response.status_code, requests.codes.ok)
338 res = response.json()
339 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
340 res["output"]["result"])
342 def test_22_create_OTS_ROADMC_DEG2(self):
343 response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
345 self.assertEqual(response.status_code, requests.codes.ok)
346 res = response.json()
347 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
348 res["output"]["result"])
350 def test_23_calculate_span_loss_base_all(self):
351 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
357 response = test_utils.post_request(url, data)
358 self.assertEqual(response.status_code, requests.codes.ok)
359 res = response.json()
360 self.assertIn('Success',
361 res["output"]["result"])
364 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
365 }, res["output"]["spans"])
368 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
369 }, res["output"]["spans"])
372 "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
373 }, res["output"]["spans"])
376 "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
377 }, res["output"]["spans"])
380 "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
381 }, res["output"]["spans"])
384 "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
385 }, res["output"]["spans"])
388 def test_24_check_otn_topology(self):
389 response = test_utils.get_otn_topo_request()
390 self.assertEqual(response.status_code, requests.codes.ok)
391 res = response.json()
392 nbNode = len(res['network'][0]['node'])
393 self.assertEqual(nbNode, 9, 'There should be 9 nodes')
394 self.assertNotIn('ietf-network-topology:link', res['network'][0],
395 'otn-topology should have no link')
397 # test service-create for OCH-OTU4 service from spdrA to spdrB
398 def test_25_create_OCH_OTU4_service_AB(self):
399 response = test_utils.service_create_request(self.cr_serv_sample_data)
400 self.assertEqual(response.status_code, requests.codes.ok)
401 res = response.json()
402 self.assertIn('PCE calculation in progress',
403 res['output']['configuration-response-common']['response-message'])
404 time.sleep(self.WAITING)
406 def test_26_get_OCH_OTU4_service_AB(self):
407 response = test_utils.get_service_list_request(
408 "services/service-OCH-OTU4-AB")
409 self.assertEqual(response.status_code, requests.codes.ok)
410 res = response.json()
412 res['services'][0]['administrative-state'], 'inService')
414 res['services'][0]['service-name'], 'service-OCH-OTU4-AB')
416 res['services'][0]['connection-type'], 'infrastructure')
418 res['services'][0]['lifecycle-state'], 'planned')
421 # Check correct configuration of devices
422 def test_27_check_interface_och_spdra(self):
423 response = test_utils.check_netconf_node_request(
424 "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
425 self.assertEqual(response.status_code, requests.codes.ok)
426 res = response.json()
427 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
428 'administrative-state': 'inService',
429 'supporting-circuit-pack-name': 'CP5-CFP',
430 'type': 'org-openroadm-interfaces:opticalChannel',
431 'supporting-port': 'CP5-CFP-P1'
432 }, **res['interface'][0]),
435 self.assertDictEqual(
436 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
437 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
438 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
440 def test_28_check_interface_OTU4_spdra(self):
441 response = test_utils.check_netconf_node_request(
442 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
443 self.assertEqual(response.status_code, requests.codes.ok)
444 res = response.json()
445 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
446 'administrative-state': 'inService',
447 'supporting-circuit-pack-name': 'CP5-CFP',
448 'supporting-interface': 'XPDR2-NETWORK1-761:768',
449 'type': 'org-openroadm-interfaces:otnOtu',
450 'supporting-port': 'CP5-CFP-P1'
452 input_dict_2 = {'tx-sapi': 'exT821pFtOc=',
453 'expected-dapi': 'exT821pFtOc=',
454 'rate': 'org-openroadm-otn-common-types:OTU4',
457 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
460 self.assertDictEqual(input_dict_2,
462 ['org-openroadm-otn-otu-interfaces:otu'])
464 def test_29_check_interface_och_spdrB(self):
465 response = test_utils.check_netconf_node_request(
466 "SPDR-SB1", "interface/XPDR2-NETWORK1-761:768")
467 self.assertEqual(response.status_code, requests.codes.ok)
468 res = response.json()
469 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
470 'administrative-state': 'inService',
471 'supporting-circuit-pack-name': 'CP5-CFP',
472 'type': 'org-openroadm-interfaces:opticalChannel',
473 'supporting-port': 'CP5-CFP-P1'
474 }, **res['interface'][0]),
477 self.assertDictEqual(
478 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
479 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
480 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
482 def test_30_check_interface_OTU4_spdrB(self):
483 response = test_utils.check_netconf_node_request(
484 "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU")
485 self.assertEqual(response.status_code, requests.codes.ok)
486 res = response.json()
487 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
488 'administrative-state': 'inService',
489 'supporting-circuit-pack-name': 'CP5-CFP',
490 'supporting-interface': 'XPDR2-NETWORK1-761:768',
491 'type': 'org-openroadm-interfaces:otnOtu',
492 'supporting-port': 'CP5-CFP-P1'
494 input_dict_2 = {'tx-dapi': 'exT821pFtOc=',
495 'expected-sapi': 'exT821pFtOc=',
496 'tx-sapi': 'HPQZi9Cb3Aw=',
497 'expected-dapi': 'HPQZi9Cb3Aw=',
498 'rate': 'org-openroadm-otn-common-types:OTU4',
502 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
505 self.assertDictEqual(input_dict_2,
507 ['org-openroadm-otn-otu-interfaces:otu'])
509 def test_31_check_no_interface_ODU4_spdra(self):
510 response = test_utils.check_netconf_node_request(
511 "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
512 self.assertEqual(response.status_code, requests.codes.conflict)
513 res = response.json()
515 {"error-type": "application", "error-tag": "data-missing",
516 "error-message": "Request could not be completed because the relevant data model content does not exist"},
517 res['errors']['error'])
519 def test_32_check_openroadm_topo_spdra(self):
520 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
521 self.assertEqual(response.status_code, requests.codes.ok)
522 res = response.json()
523 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
524 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
525 self.assertEqual({u'frequency': 196.1,
527 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
528 self.assertEqual('ROADM-A1-SRG1--SRG1-PP3-TXRX',
529 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
532 def test_33_check_openroadm_topo_ROADMA_SRG(self):
533 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
534 self.assertEqual(response.status_code, requests.codes.ok)
535 res = response.json()
536 freq_map = base64.b64decode(
537 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
538 freq_map_array = [int(x) for x in freq_map]
539 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
540 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
542 if ele['tp-id'] == 'SRG1-PP3-TXRX':
543 freq_map = base64.b64decode(
544 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
545 freq_map_array = [int(x) for x in freq_map]
546 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
547 if ele['tp-id'] == 'SRG1-PP2-TXRX':
548 self.assertNotIn('avail-freq-maps', dict.keys(ele))
551 def test_33_check_openroadm_topo_ROADMA_DEG1(self):
552 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG1")
553 self.assertEqual(response.status_code, requests.codes.ok)
554 res = response.json()
555 freq_map = base64.b64decode(
556 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
557 freq_map_array = [int(x) for x in freq_map]
558 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
559 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
561 if ele['tp-id'] == 'DEG1-CTP-TXRX':
562 freq_map = base64.b64decode(
563 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
564 freq_map_array = [int(x) for x in freq_map]
565 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
566 if ele['tp-id'] == 'DEG1-TTP-TXRX':
567 freq_map = base64.b64decode(
568 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
569 freq_map_array = [int(x) for x in freq_map]
570 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
573 def test_34_check_otn_topo_otu4_links(self):
574 response = test_utils.get_otn_topo_request()
575 self.assertEqual(response.status_code, requests.codes.ok)
576 res = response.json()
577 nb_links = len(res['network'][0]['ietf-network-topology:link'])
578 self.assertEqual(nb_links, 2)
579 listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
580 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1']
581 for link in res['network'][0]['ietf-network-topology:link']:
582 self.assertIn(link['link-id'], listLinkId)
584 link['transportpce-topology:otn-link-type'], 'OTU4')
586 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
588 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
590 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
592 link['org-openroadm-common-network:opposite-link'], listLinkId)
594 # test service-create for OCH-OTU4 service from spdrB to spdrC
595 def test_35_create_OCH_OTU4_service_BC(self):
596 self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC"
597 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1"
598 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB"
599 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
600 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
601 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
602 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
603 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
604 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
605 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
606 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
607 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
608 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
610 response = test_utils.service_create_request(self.cr_serv_sample_data)
611 self.assertEqual(response.status_code, requests.codes.ok)
612 res = response.json()
613 self.assertIn('PCE calculation in progress',
614 res['output']['configuration-response-common']['response-message'])
615 time.sleep(self.WAITING)
617 def test_36_get_OCH_OTU4_service_BC(self):
618 response = test_utils.get_service_list_request(
619 "services/service-OCH-OTU4-BC")
620 self.assertEqual(response.status_code, requests.codes.ok)
621 res = response.json()
623 res['services'][0]['administrative-state'], 'inService')
625 res['services'][0]['service-name'], 'service-OCH-OTU4-BC')
627 res['services'][0]['connection-type'], 'infrastructure')
629 res['services'][0]['lifecycle-state'], 'planned')
632 # Check correct configuration of devices
633 def test_37_check_interface_och_spdrB(self):
634 response = test_utils.check_netconf_node_request(
635 "SPDR-SB1", "interface/XPDR2-NETWORK2-753:760")
636 self.assertEqual(response.status_code, requests.codes.ok)
637 res = response.json()
638 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
639 'administrative-state': 'inService',
640 'supporting-circuit-pack-name': 'CP6-CFP',
641 'type': 'org-openroadm-interfaces:opticalChannel',
642 'supporting-port': 'CP1-CFP0-P1'
643 }, **res['interface'][0]),
646 self.assertDictEqual(
647 {u'frequency': 196.05, u'rate': u'org-openroadm-common-types:R100G',
648 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
649 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
651 def test_38_check_interface_OTU4_spdrB(self):
652 response = test_utils.check_netconf_node_request(
653 "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU")
654 self.assertEqual(response.status_code, requests.codes.ok)
655 res = response.json()
656 input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
657 'administrative-state': 'inService',
658 'supporting-circuit-pack-name': 'CP6-CFP',
659 'supporting-interface': 'XPDR2-NETWORK1-753:760',
660 'type': 'org-openroadm-interfaces:otnOtu',
661 'supporting-port': 'CP6-CFP-P1'
663 input_dict_2 = {'tx-sapi': 'HPQZi9Cb3A8=',
664 'expected-dapi': 'HPQZi9Cb3A8=',
665 'rate': 'org-openroadm-otn-common-types:OTU4',
668 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
671 self.assertDictEqual(input_dict_2,
673 ['org-openroadm-otn-otu-interfaces:otu'])
675 def test_39_check_interface_och_spdrC(self):
676 response = test_utils.check_netconf_node_request(
677 "SPDR-SC1", "interface/XPDR2-NETWORK1-753:760")
678 self.assertEqual(response.status_code, requests.codes.ok)
679 res = response.json()
680 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
681 'administrative-state': 'inService',
682 'supporting-circuit-pack-name': 'CP5-CFP',
683 'type': 'org-openroadm-interfaces:opticalChannel',
684 'supporting-port': 'CP5-CFP-P1'
685 }, **res['interface'][0]),
688 self.assertDictEqual(
689 {u'frequency': 196.05, u'rate': u'org-openroadm-common-types:R100G',
690 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
691 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
693 def test_40_check_interface_OTU4_spdrC(self):
694 response = test_utils.check_netconf_node_request(
695 "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
696 self.assertEqual(response.status_code, requests.codes.ok)
697 res = response.json()
698 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
699 'administrative-state': 'inService',
700 'supporting-circuit-pack-name': 'CP5-CFP',
701 'supporting-interface': 'XPDR2-NETWORK1-753:760',
702 'type': 'org-openroadm-interfaces:otnOtu',
703 'supporting-port': 'CP5-CFP-P1'
705 input_dict_2 = {'tx-dapi': 'HPQZi9Cb3A8=',
706 'expected-sapi': 'HPQZi9Cb3A8=',
707 'tx-sapi': 'ALx70DYYfGTx',
708 'expected-dapi': 'ALx70DYYfGTx',
709 'rate': 'org-openroadm-otn-common-types:OTU4',
713 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
716 self.assertDictEqual(input_dict_2,
718 ['org-openroadm-otn-otu-interfaces:otu'])
720 def test_41_check_no_interface_ODU4_spdrB(self):
721 response = test_utils.check_netconf_node_request(
722 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
723 self.assertEqual(response.status_code, requests.codes.conflict)
724 res = response.json()
726 {"error-type": "application", "error-tag": "data-missing",
727 "error-message": "Request could not be completed because the relevant data model content does not exist"},
728 res['errors']['error'])
730 def test_42_check_openroadm_topo_spdrB(self):
731 response = test_utils.get_ordm_topo_request("node/SPDR-SB1-XPDR2")
732 self.assertEqual(response.status_code, requests.codes.ok)
733 res = response.json()
734 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
736 if ele['tp-id'] == 'XPDR2-NETWORK1':
737 self.assertEqual({u'frequency': 196.1,
739 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
740 self.assertEqual('ROADM-B1-SRG1--SRG1-PP1-TXRX',
741 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
742 elif ele['tp-id'] == 'XPDR2-NETWORK2':
743 self.assertEqual({u'frequency': 196.05,
745 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
746 self.assertEqual('ROADM-B1-SRG1--SRG1-PP2-TXRX',
747 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
749 print("ele = {}".format(ele))
750 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
753 def test_43_check_openroadm_topo_ROADMB_SRG1(self):
754 response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
755 self.assertEqual(response.status_code, requests.codes.ok)
756 res = response.json()
757 freq_map = base64.b64decode(
758 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
759 freq_map_array = [int(x) for x in freq_map]
760 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
761 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
762 self.assertEqual(freq_map_array[93], 255, "Lambda 3 should be available")
763 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
765 if ele['tp-id'] == 'SRG1-PP1-TXRX':
766 freq_map = base64.b64decode(
767 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
768 freq_map_array = [int(x) for x in freq_map]
769 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
770 if ele['tp-id'] == 'SRG1-PP2-TXRX':
771 freq_map = base64.b64decode(
772 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
773 freq_map_array = [int(x) for x in freq_map]
774 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
775 if ele['tp-id'] == 'SRG1-PP3-TXRX':
776 self.assertNotIn('avail-freq-maps', dict.keys(ele))
779 def test_44_check_openroadm_topo_ROADMB_DEG2(self):
780 response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
781 self.assertEqual(response.status_code, requests.codes.ok)
782 res = response.json()
783 freq_map = base64.b64decode(
784 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
785 freq_map_array = [int(x) for x in freq_map]
786 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
787 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
789 if ele['tp-id'] == 'DEG2-CTP-TXRX':
790 freq_map = base64.b64decode(
791 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
792 freq_map_array = [int(x) for x in freq_map]
793 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
794 if ele['tp-id'] == 'DEG2-TTP-TXRX':
795 freq_map = base64.b64decode(
796 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
797 freq_map_array = [int(x) for x in freq_map]
798 self.assertEqual(freq_map_array[94], 0, "Lambda 1 should not be available")
801 def test_45_check_otn_topo_otu4_links(self):
802 response = test_utils.get_otn_topo_request()
803 self.assertEqual(response.status_code, requests.codes.ok)
804 res = response.json()
805 nb_links = len(res['network'][0]['ietf-network-topology:link'])
806 self.assertEqual(nb_links, 4)
807 listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
808 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1',
809 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR2-XPDR2-NETWORK1',
810 'OTU4-SPDR-SC1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
811 for link in res['network'][0]['ietf-network-topology:link']:
812 self.assertIn(link['link-id'], listLinkId)
814 link['transportpce-topology:otn-link-type'], 'OTU4')
816 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
818 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
820 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
822 link['org-openroadm-common-network:opposite-link'], listLinkId)
824 # test service-create for 100GE service from spdrA to spdrC via spdrB
825 def test_46_create_100GE_service_ABC(self):
826 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-ABC"
827 self.cr_serv_sample_data["input"]["connection-type"] = "service"
828 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
829 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
830 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
831 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
832 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
833 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
834 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
835 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
836 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
837 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
838 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
839 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
840 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
841 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
842 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
843 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
845 response = test_utils.service_create_request(self.cr_serv_sample_data)
846 self.assertEqual(response.status_code, requests.codes.ok)
847 res = response.json()
848 self.assertIn('PCE calculation in progress',
849 res['output']['configuration-response-common']['response-message'])
850 time.sleep(self.WAITING)
852 def test_47_get_100GE_service_ABC(self):
853 response = test_utils.get_service_list_request(
854 "services/service-100GE-ABC")
855 self.assertEqual(response.status_code, requests.codes.ok)
856 res = response.json()
858 res['services'][0]['administrative-state'], 'inService')
860 res['services'][0]['service-name'], 'service-100GE-ABC')
862 res['services'][0]['connection-type'], 'service')
864 res['services'][0]['lifecycle-state'], 'planned')
867 def test_48_check_interface_100GE_CLIENT_spdra(self):
868 response = test_utils.check_netconf_node_request(
869 "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
870 self.assertEqual(response.status_code, requests.codes.ok)
871 res = response.json()
872 input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
873 'administrative-state': 'inService',
874 'supporting-circuit-pack-name': 'CP2-QSFP1',
875 'type': 'org-openroadm-interfaces:ethernetCsmacd',
876 'supporting-port': 'CP2-QSFP1-P1'
878 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
880 self.assertDictEqual(
883 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
885 def test_49_check_interface_ODU4_CLIENT_spdra(self):
886 response = test_utils.check_netconf_node_request(
887 "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
888 self.assertEqual(response.status_code, requests.codes.ok)
889 res = response.json()
890 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
891 'administrative-state': 'inService',
892 'supporting-circuit-pack-name': 'CP2-QSFP1',
893 'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
894 'type': 'org-openroadm-interfaces:otnOdu',
895 'supporting-port': 'CP2-QSFP1-P1'}
897 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
898 'rate': 'org-openroadm-otn-common-types:ODU4',
899 'monitoring-mode': 'terminated'}
901 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
903 self.assertDictEqual(dict(input_dict_2,
904 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
905 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
906 self.assertDictEqual(
907 {u'payload-type': u'07', u'exp-payload-type': u'07'},
908 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
910 def test_50_check_interface_ODU4_NETWORK_spdra(self):
911 response = test_utils.check_netconf_node_request(
912 "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
913 self.assertEqual(response.status_code, requests.codes.ok)
914 res = response.json()
915 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
916 'administrative-state': 'inService',
917 'supporting-circuit-pack-name': 'CP5-CFP',
918 'type': 'org-openroadm-interfaces:otnOdu',
919 'supporting-port': 'CP5-CFP-P1'}
921 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
922 'rate': 'org-openroadm-otn-common-types:ODU4',
923 'monitoring-mode': 'monitored'}
925 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
927 self.assertDictEqual(dict(input_dict_2,
928 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
929 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
930 self.assertDictEqual(
931 {u'payload-type': u'07', u'exp-payload-type': u'07'},
932 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
934 def test_51_check_ODU4_connection_spdra(self):
935 response = test_utils.check_netconf_node_request(
937 "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
938 self.assertEqual(response.status_code, requests.codes.ok)
939 res = response.json()
942 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
943 'direction': 'bidirectional'
946 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
947 res['odu-connection'][0])
948 self.assertDictEqual({u'dst-if': u'XPDR2-NETWORK1-ODU4'},
949 res['odu-connection'][0]['destination'])
950 self.assertDictEqual({u'src-if': u'XPDR2-CLIENT1-ODU4'},
951 res['odu-connection'][0]['source'])
953 def test_52_check_interface_100GE_CLIENT_spdrc(self):
954 response = test_utils.check_netconf_node_request(
955 "SPDR-SC1", "interface/XPDR2-CLIENT1-ETHERNET")
956 self.assertEqual(response.status_code, requests.codes.ok)
957 res = response.json()
958 input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
959 'administrative-state': 'inService',
960 'supporting-circuit-pack-name': 'CP2-QSFP1',
961 'type': 'org-openroadm-interfaces:ethernetCsmacd',
962 'supporting-port': 'CP2-QSFP1-P1'
964 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
966 self.assertDictEqual(
969 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
971 def test_53_check_interface_ODU4_CLIENT_spdrc(self):
972 response = test_utils.check_netconf_node_request(
973 "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
974 self.assertEqual(response.status_code, requests.codes.ok)
975 res = response.json()
976 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
977 'administrative-state': 'inService',
978 'supporting-circuit-pack-name': 'CP2-QSFP1',
979 'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
980 'type': 'org-openroadm-interfaces:otnOdu',
981 'supporting-port': 'CP2-QSFP1-P1'}
983 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
984 'rate': 'org-openroadm-otn-common-types:ODU4',
985 'monitoring-mode': 'terminated'}
987 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
989 self.assertDictEqual(dict(input_dict_2,
990 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
991 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
992 self.assertDictEqual(
993 {u'payload-type': u'07', u'exp-payload-type': u'07'},
994 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
996 def test_54_check_interface_ODU4_NETWORK_spdrc(self):
997 response = test_utils.check_netconf_node_request(
998 "SPDR-SC1", "interface/XPDR2-NETWORK1-ODU4")
999 self.assertEqual(response.status_code, requests.codes.ok)
1000 res = response.json()
1001 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1002 'administrative-state': 'inService',
1003 'supporting-circuit-pack-name': 'CP5-CFP',
1004 'type': 'org-openroadm-interfaces:otnOdu',
1005 'supporting-port': 'CP5-CFP-P1'}
1007 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1008 'rate': 'org-openroadm-otn-common-types:ODU4',
1009 'monitoring-mode': 'monitored'}
1011 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1012 res['interface'][0])
1013 self.assertDictEqual(dict(input_dict_2,
1014 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1015 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1016 self.assertDictEqual(
1017 {u'payload-type': u'07', u'exp-payload-type': u'07'},
1018 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1020 def test_55_check_ODU4_connection_spdrc(self):
1021 response = test_utils.check_netconf_node_request(
1023 "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
1024 self.assertEqual(response.status_code, requests.codes.ok)
1025 res = response.json()
1028 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1029 'direction': 'bidirectional'
1032 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1033 res['odu-connection'][0])
1034 self.assertDictEqual({u'dst-if': u'XPDR2-NETWORK1-ODU4'},
1035 res['odu-connection'][0]['destination'])
1036 self.assertDictEqual({u'src-if': u'XPDR2-CLIENT1-ODU4'},
1037 res['odu-connection'][0]['source'])
1039 def test_56_check_interface_ODU4_NETWORK1_spdrb(self):
1040 response = test_utils.check_netconf_node_request(
1041 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1042 self.assertEqual(response.status_code, requests.codes.ok)
1043 res = response.json()
1044 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1045 'administrative-state': 'inService',
1046 'supporting-circuit-pack-name': 'CP5-CFP',
1047 'type': 'org-openroadm-interfaces:otnOdu',
1048 'supporting-port': 'CP5-CFP-P1'}
1050 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1051 'rate': 'org-openroadm-otn-common-types:ODU4',
1052 'monitoring-mode': 'monitored'}
1054 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1055 res['interface'][0])
1056 self.assertDictEqual(dict(input_dict_2,
1057 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1058 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1059 self.assertDictEqual(
1060 {u'payload-type': u'07', u'exp-payload-type': u'07'},
1061 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1063 def test_57_check_interface_ODU4_NETWORK2_spdrb(self):
1064 response = test_utils.check_netconf_node_request(
1065 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1066 self.assertEqual(response.status_code, requests.codes.ok)
1067 res = response.json()
1068 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1069 'administrative-state': 'inService',
1070 'supporting-circuit-pack-name': 'CP6-CFP',
1071 'type': 'org-openroadm-interfaces:otnOdu',
1072 'supporting-port': 'CP6-CFP-P1'}
1074 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1075 'rate': 'org-openroadm-otn-common-types:ODU4',
1076 'monitoring-mode': 'monitored'}
1078 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1079 res['interface'][0])
1080 self.assertDictEqual(dict(input_dict_2,
1081 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1082 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1083 self.assertDictEqual(
1084 {u'payload-type': u'07', u'exp-payload-type': u'07'},
1085 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1087 def test_58_check_ODU4_connection_spdrb(self):
1088 response = test_utils.check_netconf_node_request(
1090 "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4")
1091 self.assertEqual(response.status_code, requests.codes.ok)
1092 res = response.json()
1095 'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
1096 'direction': 'bidirectional'
1099 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1100 res['odu-connection'][0])
1101 self.assertDictEqual({u'dst-if': u'XPDR2-NETWORK2-ODU4'},
1102 res['odu-connection'][0]['destination'])
1103 self.assertDictEqual({u'src-if': u'XPDR2-NETWORK1-ODU4'},
1104 res['odu-connection'][0]['source'])
1106 def test_59_check_otn_topo_links(self):
1107 response = test_utils.get_otn_topo_request()
1108 self.assertEqual(response.status_code, requests.codes.ok)
1109 res = response.json()
1110 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1111 self.assertEqual(nb_links, 4)
1112 for link in res['network'][0]['ietf-network-topology:link']:
1114 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1116 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1118 def test_60_delete_service_100GE_ABC(self):
1119 response = test_utils.service_delete_request("service-100GE-ABC")
1120 self.assertEqual(response.status_code, requests.codes.ok)
1121 res = response.json()
1122 self.assertIn('Renderer service delete in progress',
1123 res['output']['configuration-response-common']['response-message'])
1124 time.sleep(self.WAITING)
1126 def test_61_check_service_list(self):
1127 response = test_utils.get_service_list_request("")
1128 self.assertEqual(response.status_code, requests.codes.ok)
1129 res = response.json()
1130 self.assertEqual(len(res['service-list']['services']), 2)
1133 def test_62_check_no_ODU4_connection_spdra(self):
1134 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1135 self.assertEqual(response.status_code, requests.codes.ok)
1136 res = response.json()
1137 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1140 def test_63_check_no_interface_ODU4_NETWORK_spdra(self):
1141 response = test_utils.check_netconf_node_request(
1142 "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
1143 self.assertEqual(response.status_code, requests.codes.conflict)
1145 def test_64_check_no_interface_ODU4_CLIENT_spdra(self):
1146 response = test_utils.check_netconf_node_request(
1147 "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1148 self.assertEqual(response.status_code, requests.codes.conflict)
1150 def test_65_check_no_interface_100GE_CLIENT_spdra(self):
1151 response = test_utils.check_netconf_node_request(
1152 "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
1153 self.assertEqual(response.status_code, requests.codes.conflict)
1155 def test_66_check_otn_topo_links(self):
1156 self.test_45_check_otn_topo_otu4_links()
1158 def test_67_delete_OCH_OTU4_service_AB(self):
1159 response = test_utils.service_delete_request("service-OCH-OTU4-AB")
1160 self.assertEqual(response.status_code, requests.codes.ok)
1161 res = response.json()
1162 self.assertIn('Renderer service delete in progress',
1163 res['output']['configuration-response-common']['response-message'])
1164 time.sleep(self.WAITING)
1166 def test_68_delete_OCH_OTU4_service_BC(self):
1167 response = test_utils.service_delete_request("service-OCH-OTU4-BC")
1168 self.assertEqual(response.status_code, requests.codes.ok)
1169 res = response.json()
1170 self.assertIn('Renderer service delete in progress',
1171 res['output']['configuration-response-common']['response-message'])
1172 time.sleep(self.WAITING)
1174 def test_69_get_no_service(self):
1175 response = test_utils.get_service_list_request("")
1176 self.assertEqual(response.status_code, requests.codes.conflict)
1177 res = response.json()
1179 {"error-type": "application", "error-tag": "data-missing",
1180 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1181 res['errors']['error'])
1184 def test_70_check_no_interface_OTU4_spdra(self):
1185 response = test_utils.check_netconf_node_request(
1186 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1187 self.assertEqual(response.status_code, requests.codes.conflict)
1189 def test_71_check_no_interface_OCH_spdra(self):
1190 response = test_utils.check_netconf_node_request(
1191 "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
1192 self.assertEqual(response.status_code, requests.codes.conflict)
1194 def test_72_getLinks_OtnTopology(self):
1195 response = test_utils.get_otn_topo_request()
1196 self.assertEqual(response.status_code, requests.codes.ok)
1197 res = response.json()
1198 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1200 def test_73_check_openroadm_topo_spdra(self):
1201 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
1202 self.assertEqual(response.status_code, requests.codes.ok)
1203 res = response.json()
1204 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1205 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1206 self.assertNotIn('wavelength', dict.keys(
1207 tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
1210 def test_74_check_openroadm_topo_ROADMB_SRG1(self):
1211 response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
1212 self.assertEqual(response.status_code, requests.codes.ok)
1213 res = response.json()
1214 freq_map = base64.b64decode(
1215 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1216 freq_map_array = [int(x) for x in freq_map]
1217 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1218 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1219 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1220 for ele in liste_tp:
1221 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1222 freq_map = base64.b64decode(
1223 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1224 freq_map_array = [int(x) for x in freq_map]
1225 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1226 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1227 freq_map = base64.b64decode(
1228 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1229 freq_map_array = [int(x) for x in freq_map]
1230 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1233 def test_75_check_openroadm_topo_ROADMB_DEG1(self):
1234 response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG1")
1235 self.assertEqual(response.status_code, requests.codes.ok)
1236 res = response.json()
1237 freq_map = base64.b64decode(
1238 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1239 freq_map_array = [int(x) for x in freq_map]
1240 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1241 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1242 for ele in liste_tp:
1243 if ele['tp-id'] == 'DEG1-CTP-TXRX':
1244 freq_map = base64.b64decode(
1245 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1246 freq_map_array = [int(x) for x in freq_map]
1247 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1248 if ele['tp-id'] == 'DEG1-TTP-TXRX':
1249 freq_map = base64.b64decode(
1250 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1251 freq_map_array = [int(x) for x in freq_map]
1252 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1255 def test_76_check_openroadm_topo_ROADMB_DEG2(self):
1256 response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
1257 self.assertEqual(response.status_code, requests.codes.ok)
1258 res = response.json()
1259 freq_map = base64.b64decode(
1260 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1261 freq_map_array = [int(x) for x in freq_map]
1262 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1263 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1264 for ele in liste_tp:
1265 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1266 freq_map = base64.b64decode(
1267 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1268 freq_map_array = [int(x) for x in freq_map]
1269 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1270 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1271 freq_map = base64.b64decode(
1272 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1273 freq_map_array = [int(x) for x in freq_map]
1274 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1277 def test_77_disconnect_xponders_from_roadm(self):
1278 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1279 response = test_utils.get_ordm_topo_request("")
1280 self.assertEqual(response.status_code, requests.codes.ok)
1281 res = response.json()
1282 links = res['network'][0]['ietf-network-topology:link']
1284 if ((link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1285 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT")
1286 and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
1287 link_name = link["link-id"]
1288 response = test_utils.delete_request(url+link_name)
1289 self.assertEqual(response.status_code, requests.codes.ok)
1291 def test_78_disconnect_spdrB(self):
1292 response = test_utils.unmount_device("SPDR-SB1")
1293 self.assertEqual(response.status_code, requests.codes.ok,
1294 test_utils.CODE_SHOULD_BE_200)
1296 def test_79_disconnect_roadmB(self):
1297 response = test_utils.unmount_device("ROADM-B1")
1298 self.assertEqual(response.status_code, requests.codes.ok,
1299 test_utils.CODE_SHOULD_BE_200)
1301 def test_80_remove_roadm_to_roadm_links(self):
1302 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1303 response = test_utils.get_ordm_topo_request("")
1304 self.assertEqual(response.status_code, requests.codes.ok)
1305 res = response.json()
1306 links = res['network'][0]['ietf-network-topology:link']
1308 if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
1309 and 'ROADM-B1' in link['link-id']):
1310 link_name = link["link-id"]
1311 response = test_utils.delete_request(url+link_name)
1312 self.assertEqual(response.status_code, requests.codes.ok)
1314 def test_81_add_omsAttributes_ROADMA_ROADMC(self):
1315 # Config ROADMA-ROADMC oms-attributes
1317 "auto-spanloss": "true",
1318 "spanloss-base": 11.4,
1319 "spanloss-current": 12,
1320 "engineered-spanloss": 12.2,
1321 "link-concatenation": [{
1323 "fiber-type": "smf",
1324 "SRLG-length": 100000,
1326 response = test_utils.add_oms_attr_request(
1327 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
1328 self.assertEqual(response.status_code, requests.codes.created)
1330 def test_82_add_omsAttributes_ROADMC_ROADMA(self):
1331 # Config ROADMC-ROADMA oms-attributes
1333 "auto-spanloss": "true",
1334 "spanloss-base": 11.4,
1335 "spanloss-current": 12,
1336 "engineered-spanloss": 12.2,
1337 "link-concatenation": [{
1339 "fiber-type": "smf",
1340 "SRLG-length": 100000,
1342 response = test_utils.add_oms_attr_request(
1343 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
1344 self.assertEqual(response.status_code, requests.codes.created)
1346 def test_83_create_OCH_OTU4_service_AC(self):
1347 self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-AC"
1348 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1349 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1350 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1351 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1352 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1353 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1354 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1355 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1356 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1357 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1358 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1359 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1360 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1361 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1362 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1363 response = test_utils.service_create_request(self.cr_serv_sample_data)
1364 self.assertEqual(response.status_code, requests.codes.ok)
1365 res = response.json()
1366 self.assertIn('PCE calculation in progress',
1367 res['output']['configuration-response-common']['response-message'])
1368 time.sleep(self.WAITING)
1370 def test_84_get_OCH_OTU4_service_AC(self):
1371 response = test_utils.get_service_list_request(
1372 "services/service-OCH-OTU4-AC")
1373 self.assertEqual(response.status_code, requests.codes.ok)
1374 res = response.json()
1376 res['services'][0]['administrative-state'], 'inService')
1378 res['services'][0]['service-name'], 'service-OCH-OTU4-AC')
1380 res['services'][0]['connection-type'], 'infrastructure')
1382 res['services'][0]['lifecycle-state'], 'planned')
1385 # test service-create for 100GE service from spdrA to spdrC via spdrB
1386 def test_85_create_100GE_service_AC(self):
1387 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-AC"
1388 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1389 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1390 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
1391 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
1392 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1393 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1394 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1395 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1396 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1397 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1398 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
1399 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
1400 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1401 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1402 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1403 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1404 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1406 response = test_utils.service_create_request(self.cr_serv_sample_data)
1407 self.assertEqual(response.status_code, requests.codes.ok)
1408 res = response.json()
1409 self.assertIn('PCE calculation in progress',
1410 res['output']['configuration-response-common']['response-message'])
1411 time.sleep(self.WAITING)
1413 def test_86_get_100GE_service_AC(self):
1414 response = test_utils.get_service_list_request("services/service-100GE-AC")
1415 self.assertEqual(response.status_code, requests.codes.ok)
1416 res = response.json()
1418 res['services'][0]['administrative-state'], 'inService')
1420 res['services'][0]['service-name'], 'service-100GE-AC')
1422 res['services'][0]['connection-type'], 'service')
1424 res['services'][0]['lifecycle-state'], 'planned')
1427 def test_87_check_configuration_spdra(self):
1428 self.test_48_check_interface_100GE_CLIENT_spdra()
1429 self.test_49_check_interface_ODU4_CLIENT_spdra()
1430 self.test_50_check_interface_ODU4_NETWORK_spdra()
1431 self.test_51_check_ODU4_connection_spdra()
1434 def test_88_check_configuration_spdrc(self):
1435 self.test_52_check_interface_100GE_CLIENT_spdrc()
1436 self.test_53_check_interface_ODU4_CLIENT_spdrc()
1437 self.test_54_check_interface_ODU4_NETWORK_spdrc()
1438 self.test_55_check_ODU4_connection_spdrc()
1440 def test_89_check_otn_topo_links(self):
1441 response = test_utils.get_otn_topo_request()
1442 self.assertEqual(response.status_code, requests.codes.ok)
1443 res = response.json()
1444 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1445 self.assertEqual(nb_links, 2)
1446 for link in res['network'][0]['ietf-network-topology:link']:
1448 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1450 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1452 def test_90_delete_100GE_service_AC(self):
1453 response = test_utils.service_delete_request("service-100GE-AC")
1454 self.assertEqual(response.status_code, requests.codes.ok)
1455 res = response.json()
1456 self.assertIn('Renderer service delete in progress',
1457 res['output']['configuration-response-common']['response-message'])
1458 time.sleep(self.WAITING)
1460 def test_91_check_service_list(self):
1461 response = test_utils.get_service_list_request("")
1462 self.assertEqual(response.status_code, requests.codes.ok)
1463 res = response.json()
1464 self.assertEqual(len(res['service-list']['services']), 1)
1467 def test_92_check_configuration_spdra(self):
1468 self.test_62_check_no_ODU4_connection_spdra()
1469 self.test_63_check_no_interface_ODU4_NETWORK_spdra()
1470 self.test_64_check_no_interface_ODU4_CLIENT_spdra()
1471 self.test_65_check_no_interface_100GE_CLIENT_spdra()
1473 def test_93_check_otn_topo_links(self):
1474 response = test_utils.get_otn_topo_request()
1475 self.assertEqual(response.status_code, requests.codes.ok)
1476 res = response.json()
1477 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1478 self.assertEqual(nb_links, 2)
1479 for link in res['network'][0]['ietf-network-topology:link']:
1481 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1483 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1485 def test_94_disconnect_xponders_from_roadm(self):
1486 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1487 response = test_utils.get_ordm_topo_request("")
1488 self.assertEqual(response.status_code, requests.codes.ok)
1489 res = response.json()
1490 links = res['network'][0]['ietf-network-topology:link']
1492 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1493 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1494 link_name = link["link-id"]
1495 response = test_utils.delete_request(url+link_name)
1496 self.assertEqual(response.status_code, requests.codes.ok)
1498 def test_95_disconnect_spdrA(self):
1499 response = test_utils.unmount_device("SPDR-SA1")
1500 self.assertEqual(response.status_code, requests.codes.ok,
1501 test_utils.CODE_SHOULD_BE_200)
1503 def test_96_disconnect_spdrC(self):
1504 response = test_utils.unmount_device("SPDR-SC1")
1505 self.assertEqual(response.status_code, requests.codes.ok,
1506 test_utils.CODE_SHOULD_BE_200)
1508 def test_97_disconnect_roadmA(self):
1509 response = test_utils.unmount_device("ROADM-A1")
1510 self.assertEqual(response.status_code, requests.codes.ok,
1511 test_utils.CODE_SHOULD_BE_200)
1513 def test_98_disconnect_roadmC(self):
1514 response = test_utils.unmount_device("ROADM-C1")
1515 self.assertEqual(response.status_code, requests.codes.ok,
1516 test_utils.CODE_SHOULD_BE_200)
1519 if __name__ == "__main__":
1520 unittest.main(verbosity=2)