3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
19 from common import test_utils
20 from common.flexgrid_utils import INDEX_1_USED_FREQ_MAP, INDEX_1_2_USED_FREQ_MAP, AVAILABLE_FREQ_MAP
23 class TransportPCEtesting(unittest.TestCase):
26 WAITING = 20 # nominal value is 300
28 cr_serv_sample_data = {"input": {
29 "sdnc-request-header": {
30 "request-id": "request-1",
31 "rpc-action": "service-create",
32 "request-system-id": "appname"
34 "service-name": "service1-OCH-OTU4",
35 "common-id": "commonId",
36 "connection-type": "infrastructure",
38 "service-rate": "100",
39 "node-id": "SPDR-SA1",
40 "service-format": "OTU",
41 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
45 "committed-info-rate": "100000",
46 "committed-burst-size": "64"
51 "port-device-name": "SPDR-SA1-XPDR1",
53 "port-name": "XPDR1-NETWORK1",
54 "port-rack": "000000.00",
55 "port-shelf": "Chassis#1"
58 "lgx-device-name": "Some lgx-device-name",
59 "lgx-port-name": "Some lgx-port-name",
60 "lgx-port-rack": "000000.00",
61 "lgx-port-shelf": "00"
66 "port-device-name": "SPDR-SA1-XPDR1",
68 "port-name": "XPDR1-NETWORK1",
69 "port-rack": "000000.00",
70 "port-shelf": "Chassis#1"
73 "lgx-device-name": "Some lgx-device-name",
74 "lgx-port-name": "Some lgx-port-name",
75 "lgx-port-rack": "000000.00",
76 "lgx-port-shelf": "00"
82 "service-rate": "100",
83 "node-id": "SPDR-SC1",
84 "service-format": "OTU",
85 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
89 "committed-info-rate": "100000",
90 "committed-burst-size": "64"
95 "port-device-name": "SPDR-SC1-XPDR1",
97 "port-name": "XPDR1-NETWORK1",
98 "port-rack": "000000.00",
99 "port-shelf": "Chassis#1"
102 "lgx-device-name": "Some lgx-device-name",
103 "lgx-port-name": "Some lgx-port-name",
104 "lgx-port-rack": "000000.00",
105 "lgx-port-shelf": "00"
110 "port-device-name": "SPDR-SC1-XPDR1",
111 "port-type": "fixed",
112 "port-name": "XPDR1-NETWORK1",
113 "port-rack": "000000.00",
114 "port-shelf": "Chassis#1"
117 "lgx-device-name": "Some lgx-device-name",
118 "lgx-port-name": "Some lgx-port-name",
119 "lgx-port-rack": "000000.00",
120 "lgx-port-shelf": "00"
125 "due-date": "2018-06-15T00:00:01Z",
126 "operator-contact": "pw1234"
132 cls.processes = test_utils.start_tpce()
133 cls.processes = test_utils.start_sims(
134 ['spdra', 'roadma', 'roadmc', 'spdrc'])
137 def tearDownClass(cls):
138 # pylint: disable=not-an-iterable
139 for process in cls.processes:
140 test_utils.shutdown_process(process)
141 print("all processes killed")
146 def test_01_connect_spdrA(self):
147 response = test_utils.mount_device("SPDR-SA1", 'spdra')
148 self.assertEqual(response.status_code,
149 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_02_connect_spdrC(self):
152 response = test_utils.mount_device("SPDR-SC1", 'spdrc')
153 self.assertEqual(response.status_code,
154 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_03_connect_rdmA(self):
157 response = test_utils.mount_device("ROADM-A1", 'roadma')
158 self.assertEqual(response.status_code,
159 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161 def test_04_connect_rdmC(self):
162 response = test_utils.mount_device("ROADM-C1", 'roadmc')
163 self.assertEqual(response.status_code,
164 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
166 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
167 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
168 "ROADM-A1", "1", "SRG1-PP1-TXRX")
169 self.assertEqual(response.status_code, requests.codes.ok)
170 res = response.json()
171 self.assertIn('Xponder Roadm Link created successfully',
172 res["output"]["result"])
175 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
176 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
177 "ROADM-A1", "1", "SRG1-PP1-TXRX")
178 self.assertEqual(response.status_code, requests.codes.ok)
179 res = response.json()
180 self.assertIn('Roadm Xponder links created successfully',
181 res["output"]["result"])
184 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
185 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
186 "ROADM-C1", "1", "SRG1-PP1-TXRX")
187 self.assertEqual(response.status_code, requests.codes.ok)
188 res = response.json()
189 self.assertIn('Xponder Roadm Link created successfully',
190 res["output"]["result"])
193 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
194 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
195 "ROADM-C1", "1", "SRG1-PP1-TXRX")
196 self.assertEqual(response.status_code, requests.codes.ok)
197 res = response.json()
198 self.assertIn('Roadm Xponder links created successfully',
199 res["output"]["result"])
202 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
203 # Config ROADMA-ROADMC oms-attributes
205 "auto-spanloss": "true",
206 "spanloss-base": 11.4,
207 "spanloss-current": 12,
208 "engineered-spanloss": 12.2,
209 "link-concatenation": [{
212 "SRLG-length": 100000,
214 response = test_utils.add_oms_attr_request(
215 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
216 self.assertEqual(response.status_code, requests.codes.created)
218 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
219 # Config ROADMC-ROADMA oms-attributes
221 "auto-spanloss": "true",
222 "spanloss-base": 11.4,
223 "spanloss-current": 12,
224 "engineered-spanloss": 12.2,
225 "link-concatenation": [{
228 "SRLG-length": 100000,
230 response = test_utils.add_oms_attr_request(
231 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
232 self.assertEqual(response.status_code, requests.codes.created)
234 # test service-create for OCH-OTU4 service from spdr to spdr
235 def test_11_check_otn_topology(self):
236 response = test_utils.get_otn_topo_request()
237 self.assertEqual(response.status_code, requests.codes.ok)
238 res = response.json()
239 nbNode = len(res['network'][0]['node'])
240 self.assertEqual(nbNode, 6, 'There should be 6 nodes')
241 self.assertNotIn('ietf-network-topology:link', res['network'][0])
243 def test_12_create_OCH_OTU4_service(self):
244 response = test_utils.service_create_request(self.cr_serv_sample_data)
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
247 self.assertIn('PCE calculation in progress',
248 res['output']['configuration-response-common']['response-message'])
249 time.sleep(self.WAITING)
251 def test_13_get_OCH_OTU4_service1(self):
252 response = test_utils.get_service_list_request(
253 "services/service1-OCH-OTU4")
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
257 res['services'][0]['administrative-state'], 'inService')
259 res['services'][0]['service-name'], 'service1-OCH-OTU4')
261 res['services'][0]['connection-type'], 'infrastructure')
263 res['services'][0]['lifecycle-state'], 'planned')
266 # Check correct configuration of devices
267 def test_14_check_interface_och_spdra(self):
268 response = test_utils.check_netconf_node_request(
269 "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
270 self.assertEqual(response.status_code, requests.codes.ok)
271 res = response.json()
272 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
273 'administrative-state': 'inService',
274 'supporting-circuit-pack-name': 'CP1-CFP0',
275 'type': 'org-openroadm-interfaces:opticalChannel',
276 'supporting-port': 'CP1-CFP0-P1'
277 }, **res['interface'][0]),
280 self.assertDictEqual(
281 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
282 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
283 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
285 def test_15_check_interface_OTU4_spdra(self):
286 response = test_utils.check_netconf_node_request(
287 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
288 self.assertEqual(response.status_code, requests.codes.ok)
289 res = response.json()
290 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
291 'administrative-state': 'inService',
292 'supporting-circuit-pack-name': 'CP1-CFP0',
293 'supporting-interface': 'XPDR1-NETWORK1-1',
294 'type': 'org-openroadm-interfaces:otnOtu',
295 'supporting-port': 'CP1-CFP0-P1'
297 input_dict_2 = {'tx-sapi': 'Swfw02qXGyI=',
298 'expected-dapi': 'Swfw02qXGyI=',
299 'rate': 'org-openroadm-otn-common-types:OTU4',
302 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
305 self.assertDictEqual(input_dict_2,
307 ['org-openroadm-otn-otu-interfaces:otu'])
309 def test_16_check_interface_och_spdrc(self):
310 response = test_utils.check_netconf_node_request(
311 "SPDR-SC1", "interface/XPDR1-NETWORK1-1")
312 self.assertEqual(response.status_code, requests.codes.ok)
313 res = response.json()
314 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-1',
315 'administrative-state': 'inService',
316 'supporting-circuit-pack-name': 'CP1-CFP0',
317 'type': 'org-openroadm-interfaces:opticalChannel',
318 'supporting-port': 'CP1-CFP0-P1'
319 }, **res['interface'][0]),
322 self.assertDictEqual(
323 {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
324 u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
325 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
327 def test_17_check_interface_OTU4_spdrc(self):
328 response = test_utils.check_netconf_node_request(
329 "SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
330 self.assertEqual(response.status_code, requests.codes.ok)
331 res = response.json()
332 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
333 'administrative-state': 'inService',
334 'supporting-circuit-pack-name': 'CP1-CFP0',
335 'supporting-interface': 'XPDR1-NETWORK1-1',
336 'type': 'org-openroadm-interfaces:otnOtu',
337 'supporting-port': 'CP1-CFP0-P1'
339 input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
340 'expected-sapi': 'Swfw02qXGyI=',
341 'tx-sapi': 'fuYZwEO660g=',
342 'expected-dapi': 'fuYZwEO660g=',
343 'rate': 'org-openroadm-otn-common-types:OTU4',
347 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
350 self.assertDictEqual(input_dict_2,
352 ['org-openroadm-otn-otu-interfaces:otu'])
354 def test_18_check_no_interface_ODU4_spdra(self):
355 response = test_utils.check_netconf_node_request(
356 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
357 self.assertEqual(response.status_code, requests.codes.conflict)
358 res = response.json()
360 {"error-type": "application", "error-tag": "data-missing",
361 "error-message": "Request could not be completed because the relevant data model content does not exist"},
362 res['errors']['error'])
364 def test_19_check_openroadm_topo_spdra(self):
365 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
366 self.assertEqual(response.status_code, requests.codes.ok)
367 res = response.json()
368 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
369 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
370 self.assertEqual({u'frequency': 196.1,
372 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
375 def test_20_check_openroadm_topo_ROADMA_SRG(self):
376 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
377 self.assertEqual(response.status_code, requests.codes.ok)
378 res = response.json()
379 freq_map = base64.b64decode(
380 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
381 freq_map_array = [int(x) for x in freq_map]
382 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
383 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
385 if ele['tp-id'] == 'SRG1-PP1-TXRX':
386 self.assertIn({u'index': 1, u'frequency': 196.1,
388 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
389 if ele['tp-id'] == 'SRG1-PP2-TXRX':
390 self.assertNotIn('used-wavelength', dict.keys(ele))
393 def test_21_check_openroadm_topo_ROADMA_DEG(self):
394 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
395 self.assertEqual(response.status_code, requests.codes.ok)
396 res = response.json()
397 freq_map = base64.b64decode(
398 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
399 freq_map_array = [int(x) for x in freq_map]
400 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
401 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
403 if ele['tp-id'] == 'DEG2-CTP-TXRX':
404 self.assertIn({u'map-name': 'cband', u'freq-map-granularity': 6.25, u'start-edge-freq': 191.325,
405 u'effective-bits': 8, u'freq-map': INDEX_1_USED_FREQ_MAP},
406 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'])
407 if ele['tp-id'] == 'DEG2-TTP-TXRX':
408 self.assertIn({u'index': 1, u'frequency': 196.1,
410 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
413 def test_22_check_otn_topo_otu4_links(self):
414 response = test_utils.get_otn_topo_request()
415 self.assertEqual(response.status_code, requests.codes.ok)
416 res = response.json()
417 nb_links = len(res['network'][0]['ietf-network-topology:link'])
418 self.assertEqual(nb_links, 2)
419 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
420 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
421 for link in res['network'][0]['ietf-network-topology:link']:
422 self.assertIn(link['link-id'], listLinkId)
424 link['transportpce-topology:otn-link-type'], 'OTU4')
426 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
428 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
430 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
432 link['org-openroadm-common-network:opposite-link'], listLinkId)
434 # test service-create for ODU4 service from spdr to spdr
435 def test_23_create_ODU4_service(self):
436 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
437 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
438 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
439 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
440 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
441 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
442 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
444 response = test_utils.service_create_request(self.cr_serv_sample_data)
445 self.assertEqual(response.status_code, requests.codes.ok)
446 res = response.json()
447 self.assertIn('PCE calculation in progress',
448 res['output']['configuration-response-common']['response-message'])
449 time.sleep(self.WAITING)
451 def test_24_get_ODU4_service1(self):
452 response = test_utils.get_service_list_request(
453 "services/service1-ODU4")
454 self.assertEqual(response.status_code, requests.codes.ok)
455 res = response.json()
457 res['services'][0]['administrative-state'], 'inService')
459 res['services'][0]['service-name'], 'service1-ODU4')
461 res['services'][0]['connection-type'], 'infrastructure')
463 res['services'][0]['lifecycle-state'], 'planned')
466 def test_25_check_interface_ODU4_spdra(self):
467 response = test_utils.check_netconf_node_request(
468 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
469 self.assertEqual(response.status_code, requests.codes.ok)
470 res = response.json()
471 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
472 'administrative-state': 'inService',
473 'supporting-circuit-pack-name': 'CP1-CFP0',
474 'supporting-interface': 'XPDR1-NETWORK1-OTU',
475 'type': 'org-openroadm-interfaces:otnOdu',
476 'supporting-port': 'CP1-CFP0-P1'}
477 # SAPI/DAPI are added in the Otu4 renderer
478 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
479 'rate': 'org-openroadm-otn-common-types:ODU4',
480 'expected-dapi': 'Swfw02qXGyI=',
481 'expected-sapi': 'fuYZwEO660g=',
482 'tx-dapi': 'fuYZwEO660g=',
483 'tx-sapi': 'Swfw02qXGyI='}
485 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
487 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
489 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
491 self.assertDictEqual(
492 {u'payload-type': u'21', u'exp-payload-type': u'21'},
493 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
495 def test_26_check_interface_ODU4_spdrc(self):
496 response = test_utils.check_netconf_node_request(
497 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
498 self.assertEqual(response.status_code, requests.codes.ok)
499 res = response.json()
500 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
501 'administrative-state': 'inService',
502 'supporting-circuit-pack-name': 'CP1-CFP0',
503 'supporting-interface': 'XPDR1-NETWORK1-OTU',
504 'type': 'org-openroadm-interfaces:otnOdu',
505 'supporting-port': 'CP1-CFP0-P1'}
506 # SAPI/DAPI are added in the Otu4 renderer
507 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
508 'rate': 'org-openroadm-otn-common-types:ODU4',
509 'tx-sapi': 'fuYZwEO660g=',
510 'tx-dapi': 'Swfw02qXGyI=',
511 'expected-sapi': 'Swfw02qXGyI=',
512 'expected-dapi': 'fuYZwEO660g='
514 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
516 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
518 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
520 self.assertDictEqual(
521 {u'payload-type': u'21', u'exp-payload-type': u'21'},
522 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
524 def test_27_check_otn_topo_links(self):
525 response = test_utils.get_otn_topo_request()
526 self.assertEqual(response.status_code, requests.codes.ok)
527 res = response.json()
528 nb_links = len(res['network'][0]['ietf-network-topology:link'])
529 self.assertEqual(nb_links, 4)
530 for link in res['network'][0]['ietf-network-topology:link']:
531 linkId = link['link-id']
532 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
533 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
535 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
537 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
538 elif (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
539 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
541 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
543 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
545 link['transportpce-topology:otn-link-type'], 'ODTU4')
547 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
548 self.assertIn(link['org-openroadm-common-network:opposite-link'],
549 ['ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
550 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
552 self.fail("this link should not exist")
554 def test_28_check_otn_topo_tp(self):
555 response = test_utils.get_otn_topo_request()
556 res = response.json()
557 for node in res['network'][0]['node']:
558 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
559 tpList = node['ietf-network-topology:termination-point']
561 if tp['tp-id'] == 'XPDR1-NETWORK1':
562 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
563 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
565 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
566 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
567 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
569 # test service-create for 10GE service from spdr to spdr
570 def test_29_create_10GE_service(self):
571 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
572 self.cr_serv_sample_data["input"]["connection-type"] = "service"
573 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
574 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
575 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
576 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
577 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
578 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
579 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
580 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
581 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
582 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
583 response = test_utils.service_create_request(self.cr_serv_sample_data)
584 self.assertEqual(response.status_code, requests.codes.ok)
585 res = response.json()
586 self.assertIn('PCE calculation in progress',
587 res['output']['configuration-response-common']['response-message'])
588 time.sleep(self.WAITING)
590 def test_30_get_10GE_service1(self):
591 response = test_utils.get_service_list_request(
592 "services/service1-10GE")
593 self.assertEqual(response.status_code, requests.codes.ok)
594 res = response.json()
596 res['services'][0]['administrative-state'], 'inService')
598 res['services'][0]['service-name'], 'service1-10GE')
600 res['services'][0]['connection-type'], 'service')
602 res['services'][0]['lifecycle-state'], 'planned')
605 def test_31_check_interface_10GE_CLIENT_spdra(self):
606 response = test_utils.check_netconf_node_request(
607 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
608 self.assertEqual(response.status_code, requests.codes.ok)
609 res = response.json()
610 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
611 'administrative-state': 'inService',
612 'supporting-circuit-pack-name': 'CP1-SFP4',
613 'type': 'org-openroadm-interfaces:ethernetCsmacd',
614 'supporting-port': 'CP1-SFP4-P1'
616 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
618 self.assertDictEqual(
620 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
622 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
623 response = test_utils.check_netconf_node_request(
624 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
625 self.assertEqual(response.status_code, requests.codes.ok)
626 res = response.json()
627 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
628 'administrative-state': 'inService',
629 'supporting-circuit-pack-name': 'CP1-SFP4',
630 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
631 'type': 'org-openroadm-interfaces:otnOdu',
632 'supporting-port': 'CP1-SFP4-P1'}
634 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
635 'rate': 'org-openroadm-otn-common-types:ODU2e',
636 'monitoring-mode': 'terminated'}
638 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
640 self.assertDictEqual(dict(input_dict_2,
641 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
642 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
643 self.assertDictEqual(
644 {u'payload-type': u'03', u'exp-payload-type': u'03'},
645 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
647 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
648 response = test_utils.check_netconf_node_request(
649 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
650 self.assertEqual(response.status_code, requests.codes.ok)
651 res = response.json()
652 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
653 'administrative-state': 'inService',
654 'supporting-circuit-pack-name': 'CP1-CFP0',
655 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
656 'type': 'org-openroadm-interfaces:otnOdu',
657 'supporting-port': 'CP1-CFP0-P1'}
659 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
660 'rate': 'org-openroadm-otn-common-types:ODU2e',
661 'monitoring-mode': 'monitored'}
662 input_dict_3 = {'trib-port-number': 1}
664 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
666 self.assertDictEqual(dict(input_dict_2,
667 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
668 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
669 self.assertDictEqual(dict(input_dict_3,
670 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
671 'parent-odu-allocation']),
672 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
673 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
676 def test_34_check_ODU2E_connection_spdra(self):
677 response = test_utils.check_netconf_node_request(
679 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
680 self.assertEqual(response.status_code, requests.codes.ok)
681 res = response.json()
684 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
685 'direction': 'bidirectional'
688 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
689 res['odu-connection'][0])
690 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
691 res['odu-connection'][0]['destination'])
692 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
693 res['odu-connection'][0]['source'])
695 def test_35_check_interface_10GE_CLIENT_spdrc(self):
696 response = test_utils.check_netconf_node_request(
697 "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
698 self.assertEqual(response.status_code, requests.codes.ok)
699 res = response.json()
700 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
701 'administrative-state': 'inService',
702 'supporting-circuit-pack-name': 'CP1-SFP4',
703 'type': 'org-openroadm-interfaces:ethernetCsmacd',
704 'supporting-port': 'CP1-SFP4-P1'
706 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
708 self.assertDictEqual(
710 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
712 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
713 response = test_utils.check_netconf_node_request(
714 "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
715 self.assertEqual(response.status_code, requests.codes.ok)
716 res = response.json()
717 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
718 'administrative-state': 'inService',
719 'supporting-circuit-pack-name': 'CP1-SFP4',
720 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
721 'type': 'org-openroadm-interfaces:otnOdu',
722 'supporting-port': 'CP1-SFP4-P1'}
724 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
725 'rate': 'org-openroadm-otn-common-types:ODU2e',
726 'monitoring-mode': 'terminated'}
728 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
730 self.assertDictEqual(dict(input_dict_2,
731 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
732 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
733 self.assertDictEqual(
734 {u'payload-type': u'03', u'exp-payload-type': u'03'},
735 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
737 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
738 response = test_utils.check_netconf_node_request(
739 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
740 self.assertEqual(response.status_code, requests.codes.ok)
741 res = response.json()
742 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
743 'administrative-state': 'inService',
744 'supporting-circuit-pack-name': 'CP1-CFP0',
745 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
746 'type': 'org-openroadm-interfaces:otnOdu',
747 'supporting-port': 'CP1-CFP0-P1'}
749 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
750 'rate': 'org-openroadm-otn-common-types:ODU2e',
751 'monitoring-mode': 'monitored'}
753 input_dict_3 = {'trib-port-number': 1}
755 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
757 self.assertDictEqual(dict(input_dict_2,
758 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
759 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
760 self.assertDictEqual(dict(input_dict_3,
761 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
762 'parent-odu-allocation']),
763 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
764 'parent-odu-allocation'])
767 'org-openroadm-otn-odu-interfaces:odu'][
768 'parent-odu-allocation']['trib-slots'])
770 def test_38_check_ODU2E_connection_spdrc(self):
771 response = test_utils.check_netconf_node_request(
773 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
774 self.assertEqual(response.status_code, requests.codes.ok)
775 res = response.json()
778 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
779 'direction': 'bidirectional'
782 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
783 res['odu-connection'][0])
784 self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1-10GE'},
785 res['odu-connection'][0]['destination'])
786 self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1-10GE'},
787 res['odu-connection'][0]['source'])
789 def test_39_check_otn_topo_links(self):
790 response = test_utils.get_otn_topo_request()
791 self.assertEqual(response.status_code, requests.codes.ok)
792 res = response.json()
793 nb_links = len(res['network'][0]['ietf-network-topology:link'])
794 self.assertEqual(nb_links, 4)
795 for link in res['network'][0]['ietf-network-topology:link']:
796 linkId = link['link-id']
797 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
798 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
800 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
802 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
804 def test_40_check_otn_topo_tp(self):
805 response = test_utils.get_otn_topo_request()
806 res = response.json()
807 for node in res['network'][0]['node']:
808 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
809 tpList = node['ietf-network-topology:termination-point']
811 if tp['tp-id'] == 'XPDR1-NETWORK1':
812 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
813 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
814 tsPoolList = list(range(1, 9))
816 tsPoolList, xpdrTpPortConAt['ts-pool'])
818 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
820 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
822 def test_41_delete_10GE_service(self):
823 response = test_utils.service_delete_request("service1-10GE")
824 self.assertEqual(response.status_code, requests.codes.ok)
825 res = response.json()
826 self.assertIn('Renderer service delete in progress',
827 res['output']['configuration-response-common']['response-message'])
828 time.sleep(self.WAITING)
830 def test_42_check_service_list(self):
831 response = test_utils.get_service_list_request("")
832 self.assertEqual(response.status_code, requests.codes.ok)
833 res = response.json()
834 self.assertEqual(len(res['service-list']['services']), 2)
837 def test_43_check_no_ODU2e_connection_spdra(self):
838 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
839 self.assertEqual(response.status_code, requests.codes.ok)
840 res = response.json()
841 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
844 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
845 response = test_utils.check_netconf_node_request(
846 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
847 self.assertEqual(response.status_code, requests.codes.conflict)
849 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
850 response = test_utils.check_netconf_node_request(
851 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
852 self.assertEqual(response.status_code, requests.codes.conflict)
854 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
855 response = test_utils.check_netconf_node_request(
856 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
857 self.assertEqual(response.status_code, requests.codes.conflict)
859 def test_47_check_otn_topo_links(self):
860 response = test_utils.get_otn_topo_request()
861 self.assertEqual(response.status_code, requests.codes.ok)
862 res = response.json()
863 nb_links = len(res['network'][0]['ietf-network-topology:link'])
864 self.assertEqual(nb_links, 4)
865 for link in res['network'][0]['ietf-network-topology:link']:
866 linkId = link['link-id']
867 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
868 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
870 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
872 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
874 def test_48_check_otn_topo_tp(self):
875 response = test_utils.get_otn_topo_request()
876 res = response.json()
877 for node in res['network'][0]['node']:
878 if (node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1'):
879 tpList = node['ietf-network-topology:termination-point']
881 if tp['tp-id'] == 'XPDR1-NETWORK1':
882 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
883 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
885 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
887 def test_49_delete_ODU4_service(self):
888 response = test_utils.service_delete_request("service1-ODU4")
889 self.assertEqual(response.status_code, requests.codes.ok)
890 res = response.json()
891 self.assertIn('Renderer service delete in progress',
892 res['output']['configuration-response-common']['response-message'])
893 time.sleep(self.WAITING)
895 def test_50_check_service_list(self):
896 response = test_utils.get_service_list_request("")
897 self.assertEqual(response.status_code, requests.codes.ok)
898 res = response.json()
899 self.assertEqual(len(res['service-list']['services']), 1)
902 def test_51_check_no_interface_ODU4_spdra(self):
903 response = test_utils.check_netconf_node_request(
904 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
905 self.assertEqual(response.status_code, requests.codes.conflict)
907 def test_52_check_otn_topo_links(self):
908 self.test_22_check_otn_topo_otu4_links()
910 def test_53_check_otn_topo_tp(self):
911 response = test_utils.get_otn_topo_request()
912 res = response.json()
913 for node in res['network'][0]['node']:
914 if node['node-id'] == 'SPDR-SA1-XPDR1' or 'SPDR-SC1-XPDR1':
915 tpList = node['ietf-network-topology:termination-point']
917 if tp['tp-id'] == 'XPDR1-NETWORK1':
918 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
919 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
921 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
923 def test_54_delete_OCH_OTU4_service(self):
924 response = test_utils.service_delete_request("service1-OCH-OTU4")
925 self.assertEqual(response.status_code, requests.codes.ok)
926 res = response.json()
927 self.assertIn('Renderer service delete in progress',
928 res['output']['configuration-response-common']['response-message'])
929 time.sleep(self.WAITING)
931 def test_55_get_no_service(self):
932 response = test_utils.get_service_list_request("")
933 self.assertEqual(response.status_code, requests.codes.conflict)
934 res = response.json()
936 {"error-type": "application", "error-tag": "data-missing",
937 "error-message": "Request could not be completed because the relevant data model content does not exist"},
938 res['errors']['error'])
941 def test_56_check_no_interface_OTU4_spdra(self):
942 response = test_utils.check_netconf_node_request(
943 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
944 self.assertEqual(response.status_code, requests.codes.conflict)
946 def test_57_check_no_interface_OCH_spdra(self):
947 response = test_utils.check_netconf_node_request(
948 "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
949 self.assertEqual(response.status_code, requests.codes.conflict)
951 def test_58_getLinks_OtnTopology(self):
952 response = test_utils.get_otn_topo_request()
953 self.assertEqual(response.status_code, requests.codes.ok)
954 res = response.json()
955 self.assertNotIn('ietf-network-topology:link', res['network'][0])
957 def test_59_check_openroadm_topo_spdra(self):
958 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
959 self.assertEqual(response.status_code, requests.codes.ok)
960 res = response.json()
961 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
962 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
963 self.assertNotIn('wavelength', dict.keys(
964 tp[u'org-openroadm-network-topology:xpdr-network-attributes']))
967 def test_60_check_openroadm_topo_ROADMA_SRG(self):
968 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
969 self.assertEqual(response.status_code, requests.codes.ok)
970 res = response.json()
971 freq_map = base64.b64decode(
972 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
973 freq_map_array = [int(x) for x in freq_map]
974 self.assertEqual(freq_map_array[0], 255, "Index 1 should be available")
975 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
977 if ele['tp-id'] == 'SRG1-PP1-TXRX':
979 'org-openroadm-network-topology:pp-attributes', dict.keys(ele))
982 def test_61_check_openroadm_topo_ROADMA_DEG(self):
983 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
984 self.assertEqual(response.status_code, requests.codes.ok)
985 res = response.json()
987 freq_map = base64.b64decode(
988 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
989 freq_map_array = [int(x) for x in freq_map]
990 self.assertEqual(freq_map_array[0], 255, "Index 1 should be available")
991 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
993 if ele['tp-id'] == 'DEG2-CTP-TXRX':
994 self.assertEqual(ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'],
996 if ele['tp-id'] == 'DEG2-TTP-TXRX':
998 'org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
1001 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
1002 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "3", "1",
1003 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1004 self.assertEqual(response.status_code, requests.codes.ok)
1005 res = response.json()
1006 self.assertIn('Xponder Roadm Link created successfully',
1007 res["output"]["result"])
1010 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
1011 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "3", "1",
1012 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1013 self.assertEqual(response.status_code, requests.codes.ok)
1014 res = response.json()
1015 self.assertIn('Roadm Xponder links created successfully',
1016 res["output"]["result"])
1019 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1020 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "3", "1",
1021 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1022 self.assertEqual(response.status_code, requests.codes.ok)
1023 res = response.json()
1024 self.assertIn('Xponder Roadm Link created successfully',
1025 res["output"]["result"])
1028 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1029 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "3", "1",
1030 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1031 self.assertEqual(response.status_code, requests.codes.ok)
1032 res = response.json()
1033 self.assertIn('Roadm Xponder links created successfully',
1034 res["output"]["result"])
1037 def test_66_create_OCH_OTU4_service_2(self):
1038 self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1039 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1040 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1041 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1042 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1043 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1044 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1045 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1046 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1047 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1048 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1049 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1050 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1051 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1052 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1053 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1054 response = test_utils.service_create_request(self.cr_serv_sample_data)
1055 self.assertEqual(response.status_code, requests.codes.ok)
1056 res = response.json()
1057 self.assertIn('PCE calculation in progress',
1058 res['output']['configuration-response-common']['response-message'])
1059 time.sleep(self.WAITING)
1061 def test_67_get_OCH_OTU4_service2(self):
1062 response = test_utils.get_service_list_request(
1063 "services/service2-OCH-OTU4")
1064 self.assertEqual(response.status_code, requests.codes.ok)
1065 res = response.json()
1067 res['services'][0]['administrative-state'], 'inService')
1069 res['services'][0]['service-name'], 'service2-OCH-OTU4')
1071 res['services'][0]['connection-type'], 'infrastructure')
1073 res['services'][0]['lifecycle-state'], 'planned')
1076 def test_68_create_ODU4_service_2(self):
1077 self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1078 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1079 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1080 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1081 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1082 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1083 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1084 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1085 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1086 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1087 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1088 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1089 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1090 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1091 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1093 response = test_utils.service_create_request(self.cr_serv_sample_data)
1094 self.assertEqual(response.status_code, requests.codes.ok)
1095 res = response.json()
1096 self.assertIn('PCE calculation in progress',
1097 res['output']['configuration-response-common']['response-message'])
1098 time.sleep(self.WAITING)
1100 def test_69_get_ODU4_service2(self):
1101 response = test_utils.get_service_list_request(
1102 "services/service2-ODU4")
1103 self.assertEqual(response.status_code, requests.codes.ok)
1104 res = response.json()
1106 res['services'][0]['administrative-state'], 'inService')
1108 res['services'][0]['service-name'], 'service2-ODU4')
1110 res['services'][0]['connection-type'], 'infrastructure')
1112 res['services'][0]['lifecycle-state'], 'planned')
1115 def test_70_create_1GE_service(self):
1116 self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1117 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1118 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1119 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1120 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1121 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1122 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1123 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1124 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1125 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1126 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1127 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1128 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1129 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1130 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1131 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1132 response = test_utils.service_create_request(self.cr_serv_sample_data)
1133 self.assertEqual(response.status_code, requests.codes.ok)
1134 res = response.json()
1135 self.assertIn('PCE calculation in progress',
1136 res['output']['configuration-response-common']['response-message'])
1137 time.sleep(self.WAITING)
1139 def test_71_get_1GE_service1(self):
1140 response = test_utils.get_service_list_request("services/service1-1GE")
1141 self.assertEqual(response.status_code, requests.codes.ok)
1142 res = response.json()
1144 res['services'][0]['administrative-state'], 'inService')
1146 res['services'][0]['service-name'], 'service1-1GE')
1148 res['services'][0]['connection-type'], 'service')
1150 res['services'][0]['lifecycle-state'], 'planned')
1153 def test_72_check_interface_1GE_CLIENT_spdra(self):
1154 response = test_utils.check_netconf_node_request(
1155 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1156 self.assertEqual(response.status_code, requests.codes.ok)
1157 res = response.json()
1158 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1159 'administrative-state': 'inService',
1160 'supporting-circuit-pack-name': 'CP1-SFP4',
1161 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1162 'supporting-port': 'CP1-SFP4-P1'
1164 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1165 res['interface'][0])
1166 self.assertDictEqual(
1168 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1170 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1171 response = test_utils.check_netconf_node_request(
1172 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1173 self.assertEqual(response.status_code, requests.codes.ok)
1174 res = response.json()
1175 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1176 'administrative-state': 'inService',
1177 'supporting-circuit-pack-name': 'CP3-SFP1',
1178 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1179 'type': 'org-openroadm-interfaces:otnOdu',
1180 'supporting-port': 'CP3-SFP1-P1'}
1182 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1183 'rate': 'org-openroadm-otn-common-types:ODU0',
1184 'monitoring-mode': 'terminated'}
1186 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1187 res['interface'][0])
1188 self.assertDictEqual(dict(input_dict_2,
1189 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1190 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1191 self.assertDictEqual(
1192 {u'payload-type': u'07', u'exp-payload-type': u'07'},
1193 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1195 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1196 response = test_utils.check_netconf_node_request(
1197 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1198 self.assertEqual(response.status_code, requests.codes.ok)
1199 res = response.json()
1200 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1201 'administrative-state': 'inService',
1202 'supporting-circuit-pack-name': 'CP3-CFP0',
1203 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1204 'type': 'org-openroadm-interfaces:otnOdu',
1205 'supporting-port': 'CP3-CFP0-P1'}
1207 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1208 'rate': 'org-openroadm-otn-common-types:ODU0',
1209 'monitoring-mode': 'monitored'}
1210 input_dict_3 = {'trib-port-number': 1}
1212 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1213 res['interface'][0])
1214 self.assertDictEqual(dict(input_dict_2,
1215 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1216 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1217 self.assertDictEqual(dict(input_dict_3,
1218 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1219 'parent-odu-allocation']),
1220 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1221 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1224 def test_75_check_ODU0_connection_spdra(self):
1225 response = test_utils.check_netconf_node_request(
1227 "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1228 self.assertEqual(response.status_code, requests.codes.ok)
1229 res = response.json()
1232 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1233 'direction': 'bidirectional'
1236 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1237 res['odu-connection'][0])
1238 self.assertDictEqual({u'dst-if': u'XPDR3-NETWORK1-ODU0-service1-1GE'},
1239 res['odu-connection'][0]['destination'])
1240 self.assertDictEqual({u'src-if': u'XPDR3-CLIENT1-ODU0-service1-1GE'},
1241 res['odu-connection'][0]['source'])
1243 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1244 response = test_utils.check_netconf_node_request(
1245 "SPDR-SC1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1246 self.assertEqual(response.status_code, requests.codes.ok)
1247 res = response.json()
1248 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1249 'administrative-state': 'inService',
1250 'supporting-circuit-pack-name': 'CP3-SFP1',
1251 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1252 'supporting-port': 'CP3-SFP1-P1'
1254 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1255 res['interface'][0])
1256 self.assertDictEqual(
1258 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1260 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1261 response = test_utils.check_netconf_node_request(
1262 "SPDR-SC1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1263 self.assertEqual(response.status_code, requests.codes.ok)
1264 res = response.json()
1265 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1266 'administrative-state': 'inService',
1267 'supporting-circuit-pack-name': 'CP3-SFP1',
1268 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1269 'type': 'org-openroadm-interfaces:otnOdu',
1270 'supporting-port': 'CP3-SFP1-P1'}
1272 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1273 'rate': 'org-openroadm-otn-common-types:ODU0',
1274 'monitoring-mode': 'terminated'}
1276 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1277 res['interface'][0])
1278 self.assertDictEqual(dict(input_dict_2,
1279 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1280 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1281 self.assertDictEqual(
1282 {u'payload-type': u'07', u'exp-payload-type': u'07'},
1283 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1285 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1286 response = test_utils.check_netconf_node_request(
1287 "SPDR-SC1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1288 self.assertEqual(response.status_code, requests.codes.ok)
1289 res = response.json()
1290 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1291 'administrative-state': 'inService',
1292 'supporting-circuit-pack-name': 'CP3-CFP0',
1293 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1294 'type': 'org-openroadm-interfaces:otnOdu',
1295 'supporting-port': 'CP3-CFP0-P1'}
1297 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1298 'rate': 'org-openroadm-otn-common-types:ODU0',
1299 'monitoring-mode': 'monitored'}
1301 input_dict_3 = {'trib-port-number': 1}
1303 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1304 res['interface'][0])
1305 self.assertDictEqual(dict(input_dict_2,
1306 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1307 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1308 self.assertDictEqual(dict(input_dict_3,
1309 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1310 'parent-odu-allocation']),
1311 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1312 'parent-odu-allocation'])
1314 res['interface'][0][
1315 'org-openroadm-otn-odu-interfaces:odu'][
1316 'parent-odu-allocation']['trib-slots'])
1318 def test_79_check_ODU0_connection_spdrc(self):
1319 response = test_utils.check_netconf_node_request(
1321 "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1322 self.assertEqual(response.status_code, requests.codes.ok)
1323 res = response.json()
1326 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1327 'direction': 'bidirectional'
1330 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1331 res['odu-connection'][0])
1332 self.assertDictEqual({u'dst-if': u'XPDR3-NETWORK1-ODU0-service1-1GE'},
1333 res['odu-connection'][0]['destination'])
1334 self.assertDictEqual({u'src-if': u'XPDR3-CLIENT1-ODU0-service1-1GE'},
1335 res['odu-connection'][0]['source'])
1337 def test_80_check_otn_topo_links(self):
1338 response = test_utils.get_otn_topo_request()
1339 self.assertEqual(response.status_code, requests.codes.ok)
1340 res = response.json()
1341 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1342 self.assertEqual(nb_links, 4)
1343 for link in res['network'][0]['ietf-network-topology:link']:
1344 linkId = link['link-id']
1345 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1346 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1348 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1350 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1352 def test_81_check_otn_topo_tp(self):
1353 response = test_utils.get_otn_topo_request()
1354 res = response.json()
1355 for node in res['network'][0]['node']:
1356 if node['node-id'] == 'SPDR-SA1-XPDR3' or 'SPDR-SC1-XPDR3':
1357 tpList = node['ietf-network-topology:termination-point']
1359 if tp['tp-id'] == 'XPDR3-NETWORK1':
1360 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1361 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1362 tsPoolList = list(range(1, 2))
1364 tsPoolList, xpdrTpPortConAt['ts-pool'])
1366 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1368 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1370 def test_82_delete_1GE_service(self):
1371 response = test_utils.service_delete_request("service1-1GE")
1372 self.assertEqual(response.status_code, requests.codes.ok)
1373 res = response.json()
1374 self.assertIn('Renderer service delete in progress',
1375 res['output']['configuration-response-common']['response-message'])
1376 time.sleep(self.WAITING)
1378 def test_83_check_service_list(self):
1379 response = test_utils.get_service_list_request("")
1380 self.assertEqual(response.status_code, requests.codes.ok)
1381 res = response.json()
1382 self.assertEqual(len(res['service-list']['services']), 2)
1385 def test_84_check_no_ODU0_connection_spdra(self):
1386 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1387 self.assertEqual(response.status_code, requests.codes.ok)
1388 res = response.json()
1389 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1392 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1393 response = test_utils.check_netconf_node_request(
1394 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1")
1395 self.assertEqual(response.status_code, requests.codes.conflict)
1397 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1398 response = test_utils.check_netconf_node_request(
1399 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1")
1400 self.assertEqual(response.status_code, requests.codes.conflict)
1402 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1403 response = test_utils.check_netconf_node_request(
1404 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1405 self.assertEqual(response.status_code, requests.codes.conflict)
1407 def test_88_check_otn_topo_links(self):
1408 response = test_utils.get_otn_topo_request()
1409 self.assertEqual(response.status_code, requests.codes.ok)
1410 res = response.json()
1411 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1412 self.assertEqual(nb_links, 4)
1413 for link in res['network'][0]['ietf-network-topology:link']:
1414 linkId = link['link-id']
1415 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1416 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1418 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1420 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1422 def test_89_check_otn_topo_tp(self):
1423 response = test_utils.get_otn_topo_request()
1424 res = response.json()
1425 for node in res['network'][0]['node']:
1426 if (node['node-id'] == 'SPDR-SA1-XPDR3' or 'SPDR-SC1-XPDR3'):
1427 tpList = node['ietf-network-topology:termination-point']
1429 if tp['tp-id'] == 'XPDR3-NETWORK1':
1430 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1431 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1433 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1435 def test_90_delete_ODU4_service(self):
1436 response = test_utils.service_delete_request("service2-ODU4")
1437 self.assertEqual(response.status_code, requests.codes.ok)
1438 res = response.json()
1439 self.assertIn('Renderer service delete in progress',
1440 res['output']['configuration-response-common']['response-message'])
1441 time.sleep(self.WAITING)
1443 def test_91_delete_OCH_OTU4_service(self):
1444 response = test_utils.service_delete_request("service2-OCH-OTU4")
1445 self.assertEqual(response.status_code, requests.codes.ok)
1446 res = response.json()
1447 self.assertIn('Renderer service delete in progress',
1448 res['output']['configuration-response-common']['response-message'])
1449 time.sleep(self.WAITING)
1451 def test_92_disconnect_xponders_from_roadm(self):
1452 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1453 response = test_utils.get_ordm_topo_request("")
1454 self.assertEqual(response.status_code, requests.codes.ok)
1455 res = response.json()
1456 links = res['network'][0]['ietf-network-topology:link']
1458 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1459 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1460 link_name = link["link-id"]
1461 response = test_utils.delete_request(url+link_name)
1462 self.assertEqual(response.status_code, requests.codes.ok)
1464 def test_93_check_openroadm_topology(self):
1465 response = test_utils.get_ordm_topo_request("")
1466 self.assertEqual(response.status_code, requests.codes.ok)
1467 res = response.json()
1468 links = res['network'][0]['ietf-network-topology:link']
1469 self.assertEqual(18, len(links), 'Topology should contain 18 links')
1471 def test_94_disconnect_spdrA(self):
1472 response = test_utils.unmount_device("SPDR-SA1")
1473 self.assertEqual(response.status_code, requests.codes.ok,
1474 test_utils.CODE_SHOULD_BE_200)
1476 def test_95_disconnect_spdrC(self):
1477 response = test_utils.unmount_device("SPDR-SC1")
1478 self.assertEqual(response.status_code, requests.codes.ok,
1479 test_utils.CODE_SHOULD_BE_200)
1481 def test_96_disconnect_roadmA(self):
1482 response = test_utils.unmount_device("ROADM-A1")
1483 self.assertEqual(response.status_code, requests.codes.ok,
1484 test_utils.CODE_SHOULD_BE_200)
1486 def test_97_disconnect_roadmC(self):
1487 response = test_utils.unmount_device("ROADM-C1")
1488 self.assertEqual(response.status_code, requests.codes.ok,
1489 test_utils.CODE_SHOULD_BE_200)
1492 if __name__ == "__main__":
1493 unittest.main(verbosity=2)