3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
19 # pylint: disable=wrong-import-order
21 sys.path.append('transportpce_tests/common/')
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils # nopep8
27 class TransportPCEtesting(unittest.TestCase):
30 WAITING = 20 # nominal value is 300
31 NODE_VERSION = '2.2.1'
33 cr_serv_sample_data = {"input": {
34 "sdnc-request-header": {
35 "request-id": "request-1",
36 "rpc-action": "service-create",
37 "request-system-id": "appname"
39 "service-name": "service-OCH-OTU4-AB",
40 "common-id": "commonId",
41 "connection-type": "infrastructure",
43 "service-rate": "100",
44 "node-id": "SPDR-SA1",
45 "service-format": "OTU",
46 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
50 "port-device-name": "SPDR-SA1-XPDR1",
52 "port-name": "XPDR1-NETWORK1",
53 "port-rack": "000000.00",
54 "port-shelf": "Chassis#1"
57 "lgx-device-name": "Some lgx-device-name",
58 "lgx-port-name": "Some lgx-port-name",
59 "lgx-port-rack": "000000.00",
60 "lgx-port-shelf": "00"
66 "port-device-name": "SPDR-SA1-XPDR1",
68 "port-name": "XPDR1-NETWORK1",
69 "port-rack": "000000.00",
70 "port-shelf": "Chassis#1"
73 "lgx-device-name": "Some lgx-device-name",
74 "lgx-port-name": "Some lgx-port-name",
75 "lgx-port-rack": "000000.00",
76 "lgx-port-shelf": "00"
83 "service-rate": "100",
84 "node-id": "SPDR-SB1",
85 "service-format": "OTU",
86 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
90 "port-device-name": "SPDR-SB1-XPDR2",
92 "port-name": "XPDR2-NETWORK1",
93 "port-rack": "000000.00",
94 "port-shelf": "Chassis#1"
97 "lgx-device-name": "Some lgx-device-name",
98 "lgx-port-name": "Some lgx-port-name",
99 "lgx-port-rack": "000000.00",
100 "lgx-port-shelf": "00"
106 "port-device-name": "SPDR-SB1-XPDR2",
107 "port-type": "fixed",
108 "port-name": "XPDR2-NETWORK1",
109 "port-rack": "000000.00",
110 "port-shelf": "Chassis#1"
113 "lgx-device-name": "Some lgx-device-name",
114 "lgx-port-name": "Some lgx-port-name",
115 "lgx-port-rack": "000000.00",
116 "lgx-port-shelf": "00"
122 "due-date": "2018-06-15T00:00:01Z",
123 "operator-contact": "pw1234"
129 cls.processes = test_utils.start_tpce()
130 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
131 ('spdrb', cls.NODE_VERSION),
132 ('spdrc', cls.NODE_VERSION),
133 ('roadma', cls.NODE_VERSION),
134 ('roadmb', cls.NODE_VERSION),
135 ('roadmc', cls.NODE_VERSION)])
138 def tearDownClass(cls):
139 # pylint: disable=not-an-iterable
140 for process in cls.processes:
141 test_utils.shutdown_process(process)
142 print("all processes killed")
147 def test_001_connect_spdrA(self):
148 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
149 self.assertEqual(response.status_code,
150 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_002_connect_spdrB(self):
153 response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
154 self.assertEqual(response.status_code,
155 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
157 def test_003_connect_spdrC(self):
158 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
159 self.assertEqual(response.status_code,
160 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
162 def test_004_connect_rdmA(self):
163 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
164 self.assertEqual(response.status_code,
165 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
167 def test_005_connect_rdmB(self):
168 response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
169 self.assertEqual(response.status_code,
170 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
172 def test_006_connect_rdmC(self):
173 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
174 self.assertEqual(response.status_code,
175 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
177 def test_007_connect_sprdA_1_N1_to_roadmA_PP1(self):
178 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
179 "ROADM-A1", "1", "SRG1-PP1-TXRX")
180 self.assertEqual(response.status_code, requests.codes.ok)
181 res = response.json()
182 self.assertIn('Xponder Roadm Link created successfully',
183 res["output"]["result"])
186 def test_008_connect_roadmA_PP1_to_spdrA_1_N1(self):
187 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
188 "ROADM-A1", "1", "SRG1-PP1-TXRX")
189 self.assertEqual(response.status_code, requests.codes.ok)
190 res = response.json()
191 self.assertIn('Roadm Xponder links created successfully',
192 res["output"]["result"])
195 def test_009_connect_sprdC_1_N1_to_roadmC_PP1(self):
196 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
197 "ROADM-C1", "1", "SRG1-PP1-TXRX")
198 self.assertEqual(response.status_code, requests.codes.ok)
199 res = response.json()
200 self.assertIn('Xponder Roadm Link created successfully',
201 res["output"]["result"])
204 def test_010_connect_roadmC_PP1_to_spdrC_1_N1(self):
205 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
206 "ROADM-C1", "1", "SRG1-PP1-TXRX")
207 self.assertEqual(response.status_code, requests.codes.ok)
208 res = response.json()
209 self.assertIn('Roadm Xponder links created successfully',
210 res["output"]["result"])
213 def test_011_connect_sprdB_2_N1_to_roadmB_PP1(self):
214 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1",
215 "ROADM-B1", "1", "SRG1-PP1-TXRX")
216 self.assertEqual(response.status_code, requests.codes.ok)
217 res = response.json()
218 self.assertIn('Xponder Roadm Link created successfully',
219 res["output"]["result"])
222 def test_012_connect_roadmB_PP1_to_spdrB_2_N1(self):
223 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1",
224 "ROADM-B1", "1", "SRG1-PP1-TXRX")
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
227 self.assertIn('Roadm Xponder links created successfully',
228 res["output"]["result"])
231 def test_013_connect_sprdB_2_N2_to_roadmB_PP2(self):
232 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2",
233 "ROADM-B1", "1", "SRG1-PP2-TXRX")
234 self.assertEqual(response.status_code, requests.codes.ok)
235 res = response.json()
236 self.assertIn('Xponder Roadm Link created successfully',
237 res["output"]["result"])
240 def test_014_connect_roadmB_PP2_to_spdrB_2_N2(self):
241 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2",
242 "ROADM-B1", "1", "SRG1-PP2-TXRX")
243 self.assertEqual(response.status_code, requests.codes.ok)
244 res = response.json()
245 self.assertIn('Roadm Xponder links created successfully',
246 res["output"]["result"])
249 def test_015_add_omsAttributes_ROADMA_ROADMB(self):
250 # Config ROADMA-ROADMC oms-attributes
252 "auto-spanloss": "true",
253 "spanloss-base": 11.4,
254 "spanloss-current": 12,
255 "engineered-spanloss": 12.2,
256 "link-concatenation": [{
259 "SRLG-length": 100000,
261 response = test_utils.add_oms_attr_request(
262 "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
263 self.assertEqual(response.status_code, requests.codes.created)
265 def test_016_add_omsAttributes_ROADMB_ROADMA(self):
266 # Config ROADMC-ROADMA oms-attributes
268 "auto-spanloss": "true",
269 "spanloss-base": 11.4,
270 "spanloss-current": 12,
271 "engineered-spanloss": 12.2,
272 "link-concatenation": [{
275 "SRLG-length": 100000,
277 response = test_utils.add_oms_attr_request(
278 "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
279 self.assertEqual(response.status_code, requests.codes.created)
281 def test_017_add_omsAttributes_ROADMB_ROADMC(self):
282 # Config ROADMA-ROADMC oms-attributes
284 "auto-spanloss": "true",
285 "spanloss-base": 11.4,
286 "spanloss-current": 12,
287 "engineered-spanloss": 12.2,
288 "link-concatenation": [{
291 "SRLG-length": 100000,
293 response = test_utils.add_oms_attr_request(
294 "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
295 self.assertEqual(response.status_code, requests.codes.created)
297 def test_018_add_omsAttributes_ROADMC_ROADMB(self):
298 # Config ROADMC-ROADMA oms-attributes
300 "auto-spanloss": "true",
301 "spanloss-base": 11.4,
302 "spanloss-current": 12,
303 "engineered-spanloss": 12.2,
304 "link-concatenation": [{
307 "SRLG-length": 100000,
309 response = test_utils.add_oms_attr_request(
310 "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
311 self.assertEqual(response.status_code, requests.codes.created)
313 def test_019_create_OTS_ROADMA_DEG1(self):
314 response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
316 self.assertEqual(response.status_code, requests.codes.ok)
317 res = response.json()
318 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
319 res["output"]["result"])
321 def test_020_create_OTS_ROADMB_DEG1(self):
322 response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX")
324 self.assertEqual(response.status_code, requests.codes.ok)
325 res = response.json()
326 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
327 res["output"]["result"])
329 def test_021_create_OTS_ROADMB_DEG2(self):
330 response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
334 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
335 res["output"]["result"])
337 def test_022_create_OTS_ROADMC_DEG2(self):
338 response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
340 self.assertEqual(response.status_code, requests.codes.ok)
341 res = response.json()
342 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
343 res["output"]["result"])
345 def test_023_calculate_span_loss_base_all(self):
346 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
352 response = test_utils.post_request(url, data)
353 self.assertEqual(response.status_code, requests.codes.ok)
354 res = response.json()
355 self.assertIn('Success',
356 res["output"]["result"])
359 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
360 }, res["output"]["spans"])
363 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
364 }, res["output"]["spans"])
367 "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
368 }, res["output"]["spans"])
371 "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
372 }, res["output"]["spans"])
375 "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
376 }, res["output"]["spans"])
379 "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
380 }, res["output"]["spans"])
383 def test_024_check_otn_topology(self):
384 response = test_utils.get_otn_topo_request()
385 self.assertEqual(response.status_code, requests.codes.ok)
386 res = response.json()
387 nbNode = len(res['network'][0]['node'])
388 self.assertEqual(nbNode, 9, 'There should be 9 nodes')
389 self.assertNotIn('ietf-network-topology:link', res['network'][0],
390 'otn-topology should have no link')
392 # test service-create for OCH-OTU4 service from spdrA to spdrB
393 def test_025_create_OCH_OTU4_service_AB(self):
394 response = test_utils.service_create_request(self.cr_serv_sample_data)
395 self.assertEqual(response.status_code, requests.codes.ok)
396 res = response.json()
397 self.assertIn('PCE calculation in progress',
398 res['output']['configuration-response-common']['response-message'])
399 time.sleep(self.WAITING)
401 def test_026_get_OCH_OTU4_service_AB(self):
402 response = test_utils.get_service_list_request(
403 "services/service-OCH-OTU4-AB")
404 self.assertEqual(response.status_code, requests.codes.ok)
405 res = response.json()
407 res['services'][0]['administrative-state'], 'inService')
409 res['services'][0]['service-name'], 'service-OCH-OTU4-AB')
411 res['services'][0]['connection-type'], 'infrastructure')
413 res['services'][0]['lifecycle-state'], 'planned')
416 def test_027_check_otn_topo_otu4_links(self):
417 response = test_utils.get_otn_topo_request()
418 self.assertEqual(response.status_code, requests.codes.ok)
419 res = response.json()
420 nb_links = len(res['network'][0]['ietf-network-topology:link'])
421 self.assertEqual(nb_links, 2)
422 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
423 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
424 for link in res['network'][0]['ietf-network-topology:link']:
425 self.assertIn(link['link-id'], listLinkId)
427 link['transportpce-networkutils:otn-link-type'], 'OTU4')
429 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
431 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
433 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
435 link['org-openroadm-common-network:opposite-link'], listLinkId)
437 # test service-create for OCH-OTU4 service from spdrB to spdrC
438 def test_028_create_OCH_OTU4_service_BC(self):
439 # pylint: disable=line-too-long
440 self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC"
441 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1"
442 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB"
443 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
444 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
445 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
446 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
447 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
448 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
449 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
450 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-NETWORK1"
451 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
452 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-NETWORK1"
454 response = test_utils.service_create_request(self.cr_serv_sample_data)
455 self.assertEqual(response.status_code, requests.codes.ok)
456 res = response.json()
457 self.assertIn('PCE calculation in progress',
458 res['output']['configuration-response-common']['response-message'])
459 time.sleep(self.WAITING)
461 def test_029_get_OCH_OTU4_service_BC(self):
462 response = test_utils.get_service_list_request(
463 "services/service-OCH-OTU4-BC")
464 self.assertEqual(response.status_code, requests.codes.ok)
465 res = response.json()
467 res['services'][0]['administrative-state'], 'inService')
469 res['services'][0]['service-name'], 'service-OCH-OTU4-BC')
471 res['services'][0]['connection-type'], 'infrastructure')
473 res['services'][0]['lifecycle-state'], 'planned')
476 def test_030_check_otn_topo_otu4_links(self):
477 response = test_utils.get_otn_topo_request()
478 self.assertEqual(response.status_code, requests.codes.ok)
479 res = response.json()
480 nb_links = len(res['network'][0]['ietf-network-topology:link'])
481 self.assertEqual(nb_links, 4)
482 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
483 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1',
484 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
485 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
486 for link in res['network'][0]['ietf-network-topology:link']:
487 self.assertIn(link['link-id'], listLinkId)
489 link['transportpce-networkutils:otn-link-type'], 'OTU4')
491 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
493 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
495 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
497 link['org-openroadm-common-network:opposite-link'], listLinkId)
499 # test service-create for ODU4 service from spdrA to spdrC via spdrB
500 def test_031_create_ODU4_service(self):
501 # pylint: disable=line-too-long
502 self.cr_serv_sample_data["input"]["service-name"] = "service-ODU4-ABC"
503 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
504 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
505 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
506 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
507 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
508 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
509 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-NETWORK1"
510 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
511 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-NETWORK1"
512 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
513 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
514 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
515 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
516 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
517 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
518 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-NETWORK1"
519 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
520 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-NETWORK1"
522 response = test_utils.service_create_request(self.cr_serv_sample_data)
523 self.assertEqual(response.status_code, requests.codes.ok)
524 res = response.json()
525 self.assertIn('PCE calculation in progress',
526 res['output']['configuration-response-common']['response-message'])
527 time.sleep(self.WAITING)
529 def test_032_get_ODU4_service_ABC(self):
530 response = test_utils.get_service_list_request(
531 "services/service-ODU4-ABC")
532 self.assertEqual(response.status_code, requests.codes.ok)
533 res = response.json()
535 res['services'][0]['administrative-state'], 'inService')
537 res['services'][0]['service-name'], 'service-ODU4-ABC')
539 res['services'][0]['connection-type'], 'infrastructure')
541 res['services'][0]['lifecycle-state'], 'planned')
544 def test_033_check_interface_ODU4_spdra(self):
545 response = test_utils.check_netconf_node_request(
546 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
547 self.assertEqual(response.status_code, requests.codes.ok)
548 res = response.json()
549 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
550 'administrative-state': 'inService',
551 'supporting-circuit-pack-name': 'CP1-CFP0',
552 'type': 'org-openroadm-interfaces:otnOdu',
553 'supporting-port': 'CP1-CFP0-P1'}
554 # SAPI/DAPI are added in the Otu4 renderer
555 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
556 'rate': 'org-openroadm-otn-common-types:ODU4',
557 'expected-dapi': 'H/OelLynehI=',
558 'expected-sapi': 'AMf1n5hK6Xkk',
559 'tx-dapi': 'AMf1n5hK6Xkk',
560 'tx-sapi': 'H/OelLynehI='}
562 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
564 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
566 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
568 self.assertDictEqual(
569 {'payload-type': '21', 'exp-payload-type': '21'},
570 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
571 response2 = test_utils.check_netconf_node_request(
572 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
573 self.assertEqual(response.status_code, requests.codes.ok)
574 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
575 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
576 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
577 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
578 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
580 def test_034_check_interface_ODU4_spdrc(self):
581 response = test_utils.check_netconf_node_request(
582 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
583 self.assertEqual(response.status_code, requests.codes.ok)
584 res = response.json()
585 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
586 'administrative-state': 'inService',
587 'supporting-circuit-pack-name': 'CP1-CFP0',
588 'type': 'org-openroadm-interfaces:otnOdu',
589 'supporting-port': 'CP1-CFP0-P1'}
590 # SAPI/DAPI are added in the Otu4 renderer
591 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
592 'rate': 'org-openroadm-otn-common-types:ODU4',
593 'expected-dapi': 'AMf1n5hK6Xkk',
594 'expected-sapi': 'H/OelLynehI=',
595 'tx-dapi': 'H/OelLynehI=',
596 'tx-sapi': 'AMf1n5hK6Xkk'}
597 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
599 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
601 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
603 self.assertDictEqual(
604 {'payload-type': '21', 'exp-payload-type': '21'},
605 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
606 response2 = test_utils.check_netconf_node_request(
607 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
608 self.assertEqual(response.status_code, requests.codes.ok)
609 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
610 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
611 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
612 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
613 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
615 def test_035_check_interface_ODU4_NETWORK1_spdrb(self):
616 response = test_utils.check_netconf_node_request(
617 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
618 self.assertEqual(response.status_code, requests.codes.ok)
619 res = response.json()
620 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
621 'administrative-state': 'inService',
622 'supporting-circuit-pack-name': 'CP5-CFP',
623 'type': 'org-openroadm-interfaces:otnOdu',
624 'supporting-port': 'CP5-CFP-P1'}
626 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
627 'rate': 'org-openroadm-otn-common-types:ODU4',
628 'monitoring-mode': 'monitored'}
630 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
632 self.assertDictEqual(dict(input_dict_2,
633 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
634 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
635 self.assertNotIn('opu',
636 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
638 def test_036_check_interface_ODU4_NETWORK2_spdrb(self):
639 response = test_utils.check_netconf_node_request(
640 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
641 self.assertEqual(response.status_code, requests.codes.ok)
642 res = response.json()
643 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
644 'administrative-state': 'inService',
645 'supporting-circuit-pack-name': 'CP6-CFP',
646 'type': 'org-openroadm-interfaces:otnOdu',
647 'supporting-port': 'CP6-CFP-P1'}
649 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
650 'rate': 'org-openroadm-otn-common-types:ODU4',
651 'monitoring-mode': 'monitored'}
653 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
655 self.assertDictEqual(dict(input_dict_2,
656 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
657 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
658 self.assertNotIn('opu',
659 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
661 def test_037_check_ODU4_connection_spdrb(self):
662 response = test_utils.check_netconf_node_request(
664 "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4")
665 self.assertEqual(response.status_code, requests.codes.ok)
666 res = response.json()
669 'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
670 'direction': 'bidirectional'
673 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
674 res['odu-connection'][0])
675 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'},
676 res['odu-connection'][0]['destination'])
677 self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'},
678 res['odu-connection'][0]['source'])
680 def test_038_check_otn_topo_links(self):
681 response = test_utils.get_otn_topo_request()
682 self.assertEqual(response.status_code, requests.codes.ok)
683 res = response.json()
684 nb_links = len(res['network'][0]['ietf-network-topology:link'])
685 self.assertEqual(nb_links, 6)
686 for link in res['network'][0]['ietf-network-topology:link']:
687 if 'OTU4' in link['link-id']:
689 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
691 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
692 elif 'ODTU4' in link['link-id']:
694 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
696 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
698 link['transportpce-networkutils:otn-link-type'], 'ODTU4')
700 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
701 self.assertIn(link['org-openroadm-common-network:opposite-link'],
702 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
703 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
705 self.fail("this link should not exist")
707 def test_039_check_otn_topo_tp(self):
708 response = test_utils.get_otn_topo_request()
709 res = response.json()
710 for node in res['network'][0]['node']:
711 if node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1':
712 tpList = node['ietf-network-topology:termination-point']
714 if tp['tp-id'] == 'XPDR1-NETWORK1':
715 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
716 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
718 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
719 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
720 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
722 # test service-create for 10GE service from spdr to spdr
723 def test_040_create_10GE_service(self):
724 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
725 self.cr_serv_sample_data["input"]["connection-type"] = "service"
726 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
727 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
728 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
729 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
730 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
731 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
732 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
733 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
734 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
735 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
736 response = test_utils.service_create_request(self.cr_serv_sample_data)
737 self.assertEqual(response.status_code, requests.codes.ok)
738 res = response.json()
739 self.assertIn('PCE calculation in progress',
740 res['output']['configuration-response-common']['response-message'])
741 time.sleep(self.WAITING)
743 def test_041_get_10GE_service1(self):
744 response = test_utils.get_service_list_request(
745 "services/service1-10GE")
746 self.assertEqual(response.status_code, requests.codes.ok)
747 res = response.json()
749 res['services'][0]['administrative-state'], 'inService')
751 res['services'][0]['service-name'], 'service1-10GE')
753 res['services'][0]['connection-type'], 'service')
755 res['services'][0]['lifecycle-state'], 'planned')
758 def test_042_check_interface_10GE_CLIENT_spdra(self):
759 response = test_utils.check_netconf_node_request(
760 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
761 self.assertEqual(response.status_code, requests.codes.ok)
762 res = response.json()
763 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
764 'administrative-state': 'inService',
765 'supporting-circuit-pack-name': 'CP1-SFP4',
766 'type': 'org-openroadm-interfaces:ethernetCsmacd',
767 'supporting-port': 'CP1-SFP4-P1'
769 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
771 self.assertDictEqual(
773 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
775 def test_043_check_interface_ODU2E_CLIENT_spdra(self):
776 response = test_utils.check_netconf_node_request(
777 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e")
778 self.assertEqual(response.status_code, requests.codes.ok)
779 res = response.json()
780 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
781 'administrative-state': 'inService',
782 'supporting-circuit-pack-name': 'CP1-SFP4',
783 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
784 'type': 'org-openroadm-interfaces:otnOdu',
785 'supporting-port': 'CP1-SFP4-P1'}
787 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
788 'rate': 'org-openroadm-otn-common-types:ODU2e',
789 'monitoring-mode': 'terminated',
790 'expected-dapi': 'B68VWipZAU0=',
791 'expected-sapi': 'BcwI5xz79t8=',
792 'tx-dapi': 'BcwI5xz79t8=',
793 'tx-sapi': 'B68VWipZAU0='}
795 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
797 self.assertDictEqual(dict(input_dict_2,
798 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
799 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
800 self.assertDictEqual(
801 {'payload-type': '03', 'exp-payload-type': '03'},
802 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
803 response2 = test_utils.check_netconf_node_request(
804 "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e")
805 self.assertEqual(response.status_code, requests.codes.ok)
806 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
807 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
808 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
809 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
810 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
812 def test_044_check_interface_ODU2E_NETWORK_spdra(self):
813 response = test_utils.check_netconf_node_request(
814 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e")
815 self.assertEqual(response.status_code, requests.codes.ok)
816 res = response.json()
817 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
818 'administrative-state': 'inService',
819 'supporting-circuit-pack-name': 'CP1-CFP0',
820 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
821 'type': 'org-openroadm-interfaces:otnOdu',
822 'supporting-port': 'CP1-CFP0-P1'}
824 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
825 'rate': 'org-openroadm-otn-common-types:ODU2e',
826 'monitoring-mode': 'monitored'}
827 input_dict_3 = {'trib-port-number': 1}
829 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
831 self.assertDictEqual(dict(input_dict_2,
832 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
833 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
834 self.assertDictEqual(dict(input_dict_3,
835 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
836 'parent-odu-allocation']),
837 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
838 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
841 def test_045_check_ODU2E_connection_spdra(self):
842 response = test_utils.check_netconf_node_request(
844 "odu-connection/XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e")
845 self.assertEqual(response.status_code, requests.codes.ok)
846 res = response.json()
849 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
850 'direction': 'bidirectional'
853 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
854 res['odu-connection'][0])
855 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
856 res['odu-connection'][0]['destination'])
857 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
858 res['odu-connection'][0]['source'])
860 def test_046_check_interface_10GE_CLIENT_spdrc(self):
861 response = test_utils.check_netconf_node_request(
862 "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
863 self.assertEqual(response.status_code, requests.codes.ok)
864 res = response.json()
865 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
866 'administrative-state': 'inService',
867 'supporting-circuit-pack-name': 'CP1-SFP4',
868 'type': 'org-openroadm-interfaces:ethernetCsmacd',
869 'supporting-port': 'CP1-SFP4-P1'
871 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
873 self.assertDictEqual(
875 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
877 def test_047_check_interface_ODU2E_CLIENT_spdrc(self):
878 response = test_utils.check_netconf_node_request(
879 "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e")
880 self.assertEqual(response.status_code, requests.codes.ok)
881 res = response.json()
882 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
883 'administrative-state': 'inService',
884 'supporting-circuit-pack-name': 'CP1-SFP4',
885 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
886 'type': 'org-openroadm-interfaces:otnOdu',
887 'supporting-port': 'CP1-SFP4-P1'}
889 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
890 'rate': 'org-openroadm-otn-common-types:ODU2e',
891 'monitoring-mode': 'terminated',
892 'expected-dapi': 'BcwI5xz79t8=',
893 'expected-sapi': 'B68VWipZAU0=',
894 'tx-dapi': 'B68VWipZAU0=',
895 'tx-sapi': 'BcwI5xz79t8='}
897 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
899 self.assertDictEqual(dict(input_dict_2,
900 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
901 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
902 self.assertDictEqual(
903 {'payload-type': '03', 'exp-payload-type': '03'},
904 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
905 response2 = test_utils.check_netconf_node_request(
906 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e")
907 self.assertEqual(response.status_code, requests.codes.ok)
908 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
909 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
910 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
911 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
912 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
914 def test_048_check_interface_ODU2E_NETWORK_spdrc(self):
915 response = test_utils.check_netconf_node_request(
916 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e")
917 self.assertEqual(response.status_code, requests.codes.ok)
918 res = response.json()
919 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
920 'administrative-state': 'inService',
921 'supporting-circuit-pack-name': 'CP1-CFP0',
922 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
923 'type': 'org-openroadm-interfaces:otnOdu',
924 'supporting-port': 'CP1-CFP0-P1'}
926 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
927 'rate': 'org-openroadm-otn-common-types:ODU2e',
928 'monitoring-mode': 'monitored'}
929 input_dict_3 = {'trib-port-number': 1}
931 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
933 self.assertDictEqual(dict(input_dict_2,
934 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
935 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
936 self.assertDictEqual(dict(input_dict_3,
937 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
938 'parent-odu-allocation']),
939 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
940 'parent-odu-allocation'])
943 'org-openroadm-otn-odu-interfaces:odu'][
944 'parent-odu-allocation']['trib-slots'])
946 def test_049_check_ODU2E_connection_spdrc(self):
947 response = test_utils.check_netconf_node_request(
949 "odu-connection/XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e")
950 self.assertEqual(response.status_code, requests.codes.ok)
951 res = response.json()
954 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
955 'direction': 'bidirectional'
958 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
959 res['odu-connection'][0])
960 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
961 res['odu-connection'][0]['destination'])
962 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
963 res['odu-connection'][0]['source'])
965 def test_050_check_otn_topo_links(self):
966 response = test_utils.get_otn_topo_request()
967 self.assertEqual(response.status_code, requests.codes.ok)
968 res = response.json()
969 nb_links = len(res['network'][0]['ietf-network-topology:link'])
970 self.assertEqual(nb_links, 6)
971 for link in res['network'][0]['ietf-network-topology:link']:
972 linkId = link['link-id']
973 if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
974 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
976 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
978 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
980 def test_051_check_otn_topo_tp(self):
981 response = test_utils.get_otn_topo_request()
982 res = response.json()
983 for node in res['network'][0]['node']:
984 if node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1':
985 tpList = node['ietf-network-topology:termination-point']
987 if tp['tp-id'] == 'XPDR1-NETWORK1':
988 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
989 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
990 tsPoolList = list(range(1, 9))
992 tsPoolList, xpdrTpPortConAt['ts-pool'])
994 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
996 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
998 def test_052_delete_10GE_service(self):
999 response = test_utils.service_delete_request("service1-10GE")
1000 self.assertEqual(response.status_code, requests.codes.ok)
1001 res = response.json()
1002 self.assertIn('Renderer service delete in progress',
1003 res['output']['configuration-response-common']['response-message'])
1004 time.sleep(self.WAITING)
1006 def test_053_check_service_list(self):
1007 response = test_utils.get_service_list_request("")
1008 self.assertEqual(response.status_code, requests.codes.ok)
1009 res = response.json()
1010 self.assertEqual(len(res['service-list']['services']), 3)
1013 def test_054_check_no_ODU2e_connection_spdra(self):
1014 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1015 self.assertEqual(response.status_code, requests.codes.ok)
1016 res = response.json()
1017 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1020 def test_055_check_no_interface_ODU2E_NETWORK_spdra(self):
1021 response = test_utils.check_netconf_node_request(
1022 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e")
1023 self.assertEqual(response.status_code, requests.codes.conflict)
1025 def test_056_check_no_interface_ODU2E_CLIENT_spdra(self):
1026 response = test_utils.check_netconf_node_request(
1027 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e")
1028 self.assertEqual(response.status_code, requests.codes.conflict)
1030 def test_057_check_no_interface_10GE_CLIENT_spdra(self):
1031 response = test_utils.check_netconf_node_request(
1032 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
1033 self.assertEqual(response.status_code, requests.codes.conflict)
1035 def test_058_check_otn_topo_links(self):
1036 response = test_utils.get_otn_topo_request()
1037 self.assertEqual(response.status_code, requests.codes.ok)
1038 res = response.json()
1039 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1040 self.assertEqual(nb_links, 6)
1041 for link in res['network'][0]['ietf-network-topology:link']:
1042 linkId = link['link-id']
1043 if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
1044 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
1046 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1048 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1050 def test_059_check_otn_topo_tp(self):
1051 response = test_utils.get_otn_topo_request()
1052 res = response.json()
1053 for node in res['network'][0]['node']:
1054 if (node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1'):
1055 tpList = node['ietf-network-topology:termination-point']
1057 if tp['tp-id'] == 'XPDR1-NETWORK1':
1058 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1059 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1061 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1063 def test_060_delete_ODU4_service(self):
1064 response = test_utils.service_delete_request("service-ODU4-ABC")
1065 self.assertEqual(response.status_code, requests.codes.ok)
1066 res = response.json()
1067 self.assertIn('Renderer service delete in progress',
1068 res['output']['configuration-response-common']['response-message'])
1069 time.sleep(self.WAITING)
1071 def test_061_check_service_list(self):
1072 response = test_utils.get_service_list_request("")
1073 self.assertEqual(response.status_code, requests.codes.ok)
1074 res = response.json()
1075 self.assertEqual(len(res['service-list']['services']), 2)
1078 def test_062_check_no_interface_ODU4_spdra(self):
1079 response = test_utils.check_netconf_node_request(
1080 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
1081 self.assertEqual(response.status_code, requests.codes.conflict)
1083 def test_063_check_no_interface_ODU4_spdrb(self):
1084 response = test_utils.check_netconf_node_request(
1085 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1086 self.assertEqual(response.status_code, requests.codes.conflict)
1087 response = test_utils.check_netconf_node_request(
1088 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1089 self.assertEqual(response.status_code, requests.codes.conflict)
1091 def test_064_check_no_ODU4_connection_spdrb(self):
1092 response = test_utils.check_netconf_node_request("SPDR-SB1", "")
1093 self.assertEqual(response.status_code, requests.codes.ok)
1094 res = response.json()
1095 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1098 def test_065_check_no_interface_ODU4_spdrc(self):
1099 response = test_utils.check_netconf_node_request(
1100 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
1101 self.assertEqual(response.status_code, requests.codes.conflict)
1103 def test_066_check_otn_topo_links(self):
1104 self.test_030_check_otn_topo_otu4_links()
1106 def test_067_check_otn_topo_tp(self):
1107 response = test_utils.get_otn_topo_request()
1108 res = response.json()
1109 for node in res['network'][0]['node']:
1110 if node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1':
1111 tpList = node['ietf-network-topology:termination-point']
1113 if tp['tp-id'] == 'XPDR1-NETWORK1':
1114 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1115 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
1117 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
1119 # test service-create for ODU4 service from spdrA to spdrB
1120 def test_068_create_ODU4_service_AB(self):
1121 # pylint: disable=line-too-long
1122 self.cr_serv_sample_data["input"]["service-name"] = "service-ODU4-AB"
1123 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1124 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1125 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1126 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1127 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
1128 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-NETWORK1"
1129 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
1130 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-NETWORK1"
1131 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SB1"
1132 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSB"
1133 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1134 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1135 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1136 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
1137 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1138 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
1139 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1141 response = test_utils.service_create_request(self.cr_serv_sample_data)
1142 self.assertEqual(response.status_code, requests.codes.ok)
1143 res = response.json()
1144 self.assertIn('PCE calculation in progress',
1145 res['output']['configuration-response-common']['response-message'])
1146 time.sleep(self.WAITING)
1148 def test_069_get_ODU4_service_AB(self):
1149 response = test_utils.get_service_list_request(
1150 "services/service-ODU4-AB")
1151 self.assertEqual(response.status_code, requests.codes.ok)
1152 res = response.json()
1154 res['services'][0]['administrative-state'], 'inService')
1156 res['services'][0]['service-name'], 'service-ODU4-AB')
1158 res['services'][0]['connection-type'], 'infrastructure')
1160 res['services'][0]['lifecycle-state'], 'planned')
1163 def test_070_check_interface_ODU4_spdra(self):
1164 response = test_utils.check_netconf_node_request(
1165 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
1166 self.assertEqual(response.status_code, requests.codes.ok)
1167 res = response.json()
1168 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
1169 'administrative-state': 'inService',
1170 'supporting-circuit-pack-name': 'CP1-CFP0',
1171 'type': 'org-openroadm-interfaces:otnOdu',
1172 'supporting-port': 'CP1-CFP0-P1'}
1173 # SAPI/DAPI are added in the Otu4 renderer
1174 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1175 'rate': 'org-openroadm-otn-common-types:ODU4',
1176 'expected-dapi': 'H/OelLynehI=',
1177 'expected-sapi': 'X+8cRNi+HbE=',
1178 'tx-dapi': 'X+8cRNi+HbE=',
1179 'tx-sapi': 'H/OelLynehI='}
1181 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1182 res['interface'][0])
1183 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1185 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1187 self.assertDictEqual(
1188 {'payload-type': '21', 'exp-payload-type': '21'},
1189 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1190 response2 = test_utils.check_netconf_node_request(
1191 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1192 self.assertEqual(response.status_code, requests.codes.ok)
1193 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1194 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1195 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1196 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1197 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1199 def test_071_check_interface_ODU4_spdrb_N1(self):
1200 response = test_utils.check_netconf_node_request(
1201 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1202 self.assertEqual(response.status_code, requests.codes.ok)
1203 res = response.json()
1204 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1205 'administrative-state': 'inService',
1206 'supporting-circuit-pack-name': 'CP1-CFP0',
1207 'type': 'org-openroadm-interfaces:otnOdu',
1208 'supporting-port': 'CP1-CFP0-P1'}
1209 # SAPI/DAPI are added in the Otu4 renderer
1210 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1211 'rate': 'org-openroadm-otn-common-types:ODU4',
1212 'expected-dapi': 'X+8cRNi+HbE=',
1213 'expected-sapi': 'H/OelLynehI=',
1214 'tx-dapi': 'H/OelLynehI=',
1215 'tx-sapi': 'X+8cRNi+HbE='}
1217 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1218 res['interface'][0])
1219 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1221 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1223 self.assertDictEqual(
1224 {'payload-type': '21', 'exp-payload-type': '21'},
1225 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1226 response2 = test_utils.check_netconf_node_request(
1227 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
1228 self.assertEqual(response.status_code, requests.codes.ok)
1229 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1230 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1231 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1232 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1233 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1235 # test service-create for ODU4 service from spdrB to spdrC
1236 def test_072_create_ODU4_service_BC(self):
1237 # pylint: disable=line-too-long
1238 self.cr_serv_sample_data["input"]["service-name"] = "service-ODU4-BC"
1239 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
1240 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
1241 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
1242 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
1243 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
1244 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
1245 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
1246 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-NETWORK1"
1247 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
1248 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-NETWORK1"
1250 response = test_utils.service_create_request(self.cr_serv_sample_data)
1251 self.assertEqual(response.status_code, requests.codes.ok)
1252 res = response.json()
1253 self.assertIn('PCE calculation in progress',
1254 res['output']['configuration-response-common']['response-message'])
1255 time.sleep(self.WAITING)
1257 def test_073_get_ODU4_service_AB(self):
1258 response = test_utils.get_service_list_request(
1259 "services/service-ODU4-BC")
1260 self.assertEqual(response.status_code, requests.codes.ok)
1261 res = response.json()
1263 res['services'][0]['administrative-state'], 'inService')
1265 res['services'][0]['service-name'], 'service-ODU4-BC')
1267 res['services'][0]['connection-type'], 'infrastructure')
1269 res['services'][0]['lifecycle-state'], 'planned')
1272 def test_074_check_interface_ODU4_spdrb_N2(self):
1273 response = test_utils.check_netconf_node_request(
1274 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1275 self.assertEqual(response.status_code, requests.codes.ok)
1276 res = response.json()
1277 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1278 'administrative-state': 'inService',
1279 'supporting-circuit-pack-name': 'CP1-CFP0',
1280 'type': 'org-openroadm-interfaces:otnOdu',
1281 'supporting-port': 'CP1-CFP0-P1'}
1282 # SAPI/DAPI are added in the Otu4 renderer
1283 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1284 'rate': 'org-openroadm-otn-common-types:ODU4',
1285 'expected-dapi': 'X+8cRNi+HbI=',
1286 'expected-sapi': 'AMf1n5hK6Xkk',
1287 'tx-dapi': 'AMf1n5hK6Xkk',
1288 'tx-sapi': 'X+8cRNi+HbI='}
1290 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1291 res['interface'][0])
1292 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1294 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1296 self.assertDictEqual(
1297 {'payload-type': '21', 'exp-payload-type': '21'},
1298 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1299 response2 = test_utils.check_netconf_node_request(
1300 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
1301 self.assertEqual(response.status_code, requests.codes.ok)
1302 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1303 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1304 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1305 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1306 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1308 def test_075_check_interface_ODU4_spdrc(self):
1309 response = test_utils.check_netconf_node_request(
1310 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
1311 self.assertEqual(response.status_code, requests.codes.ok)
1312 res = response.json()
1313 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
1314 'administrative-state': 'inService',
1315 'supporting-circuit-pack-name': 'CP1-CFP0',
1316 'type': 'org-openroadm-interfaces:otnOdu',
1317 'supporting-port': 'CP1-CFP0-P1'}
1318 # SAPI/DAPI are added in the Otu4 renderer
1319 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1320 'rate': 'org-openroadm-otn-common-types:ODU4',
1321 'expected-dapi': 'AMf1n5hK6Xkk',
1322 'expected-sapi': 'X+8cRNi+HbI=',
1323 'tx-dapi': 'X+8cRNi+HbI=',
1324 'tx-sapi': 'AMf1n5hK6Xkk'}
1325 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1326 res['interface'][0])
1327 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1329 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1331 self.assertDictEqual(
1332 {'payload-type': '21', 'exp-payload-type': '21'},
1333 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1334 response2 = test_utils.check_netconf_node_request(
1335 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1336 self.assertEqual(response.status_code, requests.codes.ok)
1337 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1338 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1339 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1340 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1341 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1343 # test service-create for 10GE service from spdr to spdr
1344 def test_076_create_10GE_service_ABC(self):
1345 # pylint: disable=line-too-long
1346 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
1347 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1348 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
1349 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
1350 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
1351 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1352 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1353 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
1354 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1355 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
1356 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1357 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
1358 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1359 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1360 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1361 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1362 response = test_utils.service_create_request(self.cr_serv_sample_data)
1363 self.assertEqual(response.status_code, requests.codes.ok)
1364 res = response.json()
1365 self.assertIn('PCE calculation in progress',
1366 res['output']['configuration-response-common']['response-message'])
1367 time.sleep(self.WAITING)
1369 def test_077_check_configuration_spdra_spdrc(self):
1370 self.test_041_get_10GE_service1()
1371 self.test_042_check_interface_10GE_CLIENT_spdra()
1372 self.test_043_check_interface_ODU2E_CLIENT_spdra()
1373 self.test_044_check_interface_ODU2E_NETWORK_spdra()
1374 self.test_045_check_ODU2E_connection_spdra()
1375 self.test_046_check_interface_10GE_CLIENT_spdrc()
1376 self.test_047_check_interface_ODU2E_CLIENT_spdrc()
1377 self.test_049_check_ODU2E_connection_spdrc()
1379 def test_078_check_interface_ODU2E_NETWORK1_spdrb(self):
1380 response = test_utils.check_netconf_node_request(
1381 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU2e")
1382 self.assertEqual(response.status_code, requests.codes.ok)
1383 res = response.json()
1384 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU2e',
1385 'administrative-state': 'inService',
1386 'supporting-circuit-pack-name': 'CP1-CFP0',
1387 'supporting-interface': 'XPDR2-NETWORK1-ODU4',
1388 'type': 'org-openroadm-interfaces:otnOdu',
1389 'supporting-port': 'CP1-CFP0-P1'}
1391 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1392 'rate': 'org-openroadm-otn-common-types:ODU2e',
1393 'monitoring-mode': 'monitored'}
1394 input_dict_3 = {'trib-port-number': 1}
1396 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1397 res['interface'][0])
1398 self.assertDictEqual(dict(input_dict_2,
1399 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1400 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1401 self.assertDictEqual(dict(input_dict_3,
1402 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1403 'parent-odu-allocation']),
1404 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1405 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1408 def test_079_check_interface_ODU2E_NETWORK2_spdrb(self):
1409 response = test_utils.check_netconf_node_request(
1410 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU2e")
1411 self.assertEqual(response.status_code, requests.codes.ok)
1412 res = response.json()
1413 input_dict_1 = {'name': 'XPDR2-NETWORK2-ODU2e',
1414 'administrative-state': 'inService',
1415 'supporting-circuit-pack-name': 'CP1-CFP0',
1416 'supporting-interface': 'XPDR2-NETWORK2-ODU4',
1417 'type': 'org-openroadm-interfaces:otnOdu',
1418 'supporting-port': 'CP1-CFP0-P1'}
1420 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1421 'rate': 'org-openroadm-otn-common-types:ODU2e',
1422 'monitoring-mode': 'monitored'}
1423 input_dict_3 = {'trib-port-number': 1}
1425 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1426 res['interface'][0])
1427 self.assertDictEqual(dict(input_dict_2,
1428 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1429 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1430 self.assertDictEqual(dict(input_dict_3,
1431 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1432 'parent-odu-allocation']),
1433 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1434 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1437 def test_080_check_ODU2E_connection_spdrb(self):
1438 response = test_utils.check_netconf_node_request(
1440 "odu-connection/XPDR2-NETWORK1-ODU2e-x-XPDR2-NETWORK2-ODU2e")
1441 self.assertEqual(response.status_code, requests.codes.ok)
1442 res = response.json()
1445 'XPDR2-NETWORK1-ODU2e-x-XPDR2-NETWORK2-ODU2e',
1446 'direction': 'bidirectional'
1449 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1450 res['odu-connection'][0])
1451 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU2e'},
1452 res['odu-connection'][0]['destination'])
1453 self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU2e'},
1454 res['odu-connection'][0]['source'])
1456 def test_081_check_otn_topo_links(self):
1457 response = test_utils.get_otn_topo_request()
1458 self.assertEqual(response.status_code, requests.codes.ok)
1459 res = response.json()
1460 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1461 self.assertEqual(nb_links, 8)
1462 for link in res['network'][0]['ietf-network-topology:link']:
1463 linkId = link['link-id']
1464 if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
1465 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1',
1466 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
1467 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2',)):
1469 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
1471 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
1473 def test_082_check_otn_topo_tp(self):
1474 self.test_051_check_otn_topo_tp()
1476 def test_083_delete_10GE_service(self):
1477 response = test_utils.service_delete_request("service1-10GE")
1478 self.assertEqual(response.status_code, requests.codes.ok)
1479 res = response.json()
1480 self.assertIn('Renderer service delete in progress',
1481 res['output']['configuration-response-common']['response-message'])
1482 time.sleep(self.WAITING)
1484 def test_084_check_service_list(self):
1485 response = test_utils.get_service_list_request("")
1486 self.assertEqual(response.status_code, requests.codes.ok)
1487 res = response.json()
1488 self.assertEqual(len(res['service-list']['services']), 4)
1491 def test_085_check_configuration_spdra(self):
1492 self.test_054_check_no_ODU2e_connection_spdra()
1493 self.test_055_check_no_interface_ODU2E_NETWORK_spdra()
1494 self.test_056_check_no_interface_ODU2E_CLIENT_spdra()
1495 self.test_057_check_no_interface_10GE_CLIENT_spdra()
1497 def test_086_check_no_ODU2e_connection_spdrb(self):
1498 response = test_utils.check_netconf_node_request("SPDR-SB1", "")
1499 self.assertEqual(response.status_code, requests.codes.ok)
1500 res = response.json()
1501 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1504 def test_087_check_no_interface_ODU2E_NETWORK1_spdrb(self):
1505 response = test_utils.check_netconf_node_request(
1506 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU2e")
1507 self.assertEqual(response.status_code, requests.codes.conflict)
1509 def test_088_check_no_interface_ODU2E_NETWORK2_spdrb(self):
1510 response = test_utils.check_netconf_node_request(
1511 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU2e")
1512 self.assertEqual(response.status_code, requests.codes.conflict)
1514 def test_089_check_otn_topo_links(self):
1515 response = test_utils.get_otn_topo_request()
1516 self.assertEqual(response.status_code, requests.codes.ok)
1517 res = response.json()
1518 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1519 self.assertEqual(nb_links, 8)
1520 for link in res['network'][0]['ietf-network-topology:link']:
1521 linkId = link['link-id']
1522 if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
1523 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1',
1524 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
1525 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2',)):
1527 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1529 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1531 def test_090_check_otn_topo_tp(self):
1532 self.test_059_check_otn_topo_tp()
1534 def test_091_delete_ODU4_service_AB(self):
1535 response = test_utils.service_delete_request("service-ODU4-AB")
1536 self.assertEqual(response.status_code, requests.codes.ok)
1537 res = response.json()
1538 self.assertIn('Renderer service delete in progress',
1539 res['output']['configuration-response-common']['response-message'])
1540 time.sleep(self.WAITING)
1542 def test_092_delete_ODU4_service_BC(self):
1543 response = test_utils.service_delete_request("service-ODU4-BC")
1544 self.assertEqual(response.status_code, requests.codes.ok)
1545 res = response.json()
1546 self.assertIn('Renderer service delete in progress',
1547 res['output']['configuration-response-common']['response-message'])
1548 time.sleep(self.WAITING)
1550 def test_093_check_global_config(self):
1551 self.test_061_check_service_list()
1552 self.test_062_check_no_interface_ODU4_spdra()
1553 self.test_063_check_no_interface_ODU4_spdrb()
1554 self.test_064_check_no_ODU4_connection_spdrb()
1555 self.test_065_check_no_interface_ODU4_spdrc()
1556 self.test_066_check_otn_topo_links()
1557 self.test_067_check_otn_topo_tp()
1559 def test_094_delete_OCH_OTU4_service_AB(self):
1560 response = test_utils.service_delete_request("service-OCH-OTU4-AB")
1561 self.assertEqual(response.status_code, requests.codes.ok)
1562 res = response.json()
1563 self.assertIn('Renderer service delete in progress',
1564 res['output']['configuration-response-common']['response-message'])
1565 time.sleep(self.WAITING)
1567 def test_095_delete_OCH_OTU4_service_BC(self):
1568 response = test_utils.service_delete_request("service-OCH-OTU4-BC")
1569 self.assertEqual(response.status_code, requests.codes.ok)
1570 res = response.json()
1571 self.assertIn('Renderer service delete in progress',
1572 res['output']['configuration-response-common']['response-message'])
1573 time.sleep(self.WAITING)
1575 def test_096_get_no_service(self):
1576 response = test_utils.get_service_list_request("")
1577 self.assertEqual(response.status_code, requests.codes.conflict)
1578 res = response.json()
1580 {"error-type": "application", "error-tag": "data-missing",
1581 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1582 res['errors']['error'])
1585 def test_097_check_no_interface_OTU4_spdra(self):
1586 response = test_utils.check_netconf_node_request(
1587 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
1588 self.assertEqual(response.status_code, requests.codes.conflict)
1590 def test_098_check_no_interface_OCH_spdra(self):
1591 response = test_utils.check_netconf_node_request(
1592 "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
1593 self.assertEqual(response.status_code, requests.codes.conflict)
1595 def test_099_getLinks_OtnTopology(self):
1596 response = test_utils.get_otn_topo_request()
1597 self.assertEqual(response.status_code, requests.codes.ok)
1598 res = response.json()
1599 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1601 def test_100_disconnect_xponders_from_roadm(self):
1602 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1603 response = test_utils.get_ordm_topo_request("")
1604 self.assertEqual(response.status_code, requests.codes.ok)
1605 res = response.json()
1606 links = res['network'][0]['ietf-network-topology:link']
1608 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1609 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1610 link_name = link["link-id"]
1611 response = test_utils.delete_request(url+link_name)
1612 self.assertEqual(response.status_code, requests.codes.ok)
1614 def test_101_check_openroadm_topology(self):
1615 response = test_utils.get_ordm_topo_request("")
1616 self.assertEqual(response.status_code, requests.codes.ok)
1617 res = response.json()
1618 links = res['network'][0]['ietf-network-topology:link']
1619 self.assertEqual(28, len(links), 'Topology should contain 28 links')
1621 def test_102_disconnect_spdrA(self):
1622 response = test_utils.unmount_device("SPDR-SA1")
1623 self.assertEqual(response.status_code, requests.codes.ok,
1624 test_utils.CODE_SHOULD_BE_200)
1626 def test_103_disconnect_spdrC(self):
1627 response = test_utils.unmount_device("SPDR-SC1")
1628 self.assertEqual(response.status_code, requests.codes.ok,
1629 test_utils.CODE_SHOULD_BE_200)
1631 def test_104_disconnect_spdrB(self):
1632 response = test_utils.unmount_device("SPDR-SB1")
1633 self.assertEqual(response.status_code, requests.codes.ok,
1634 test_utils.CODE_SHOULD_BE_200)
1636 def test_105_disconnect_roadmA(self):
1637 response = test_utils.unmount_device("ROADM-A1")
1638 self.assertEqual(response.status_code, requests.codes.ok,
1639 test_utils.CODE_SHOULD_BE_200)
1641 def test_106_disconnect_roadmB(self):
1642 response = test_utils.unmount_device("ROADM-B1")
1643 self.assertEqual(response.status_code, requests.codes.ok,
1644 test_utils.CODE_SHOULD_BE_200)
1646 def test_107_disconnect_roadmC(self):
1647 response = test_utils.unmount_device("ROADM-C1")
1648 self.assertEqual(response.status_code, requests.codes.ok,
1649 test_utils.CODE_SHOULD_BE_200)
1652 if __name__ == "__main__":
1653 unittest.main(verbosity=2)