3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
19 # pylint: disable=wrong-import-order
21 sys.path.append('transportpce_tests/common/')
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils # nopep8
27 class TransportPCEtesting(unittest.TestCase):
30 WAITING = 20 # nominal value is 300
31 NODE_VERSION = '2.2.1'
33 cr_serv_sample_data = {"input": {
34 "sdnc-request-header": {
35 "request-id": "request-1",
36 "rpc-action": "service-create",
37 "request-system-id": "appname"
39 "service-name": "service-OCH-OTU4-AB",
40 "common-id": "commonId",
41 "connection-type": "infrastructure",
43 "service-rate": "100",
44 "node-id": "SPDR-SA1",
45 "service-format": "OTU",
46 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
50 "committed-info-rate": "100000",
51 "committed-burst-size": "64"
56 "port-device-name": "SPDR-SA1-XPDR1",
58 "port-name": "XPDR1-NETWORK1",
59 "port-rack": "000000.00",
60 "port-shelf": "Chassis#1"
63 "lgx-device-name": "Some lgx-device-name",
64 "lgx-port-name": "Some lgx-port-name",
65 "lgx-port-rack": "000000.00",
66 "lgx-port-shelf": "00"
71 "port-device-name": "SPDR-SA1-XPDR1",
73 "port-name": "XPDR1-NETWORK1",
74 "port-rack": "000000.00",
75 "port-shelf": "Chassis#1"
78 "lgx-device-name": "Some lgx-device-name",
79 "lgx-port-name": "Some lgx-port-name",
80 "lgx-port-rack": "000000.00",
81 "lgx-port-shelf": "00"
87 "service-rate": "100",
88 "node-id": "SPDR-SB1",
89 "service-format": "OTU",
90 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
94 "committed-info-rate": "100000",
95 "committed-burst-size": "64"
100 "port-device-name": "SPDR-SB1-XPDR2",
101 "port-type": "fixed",
102 "port-name": "XPDR2-NETWORK1",
103 "port-rack": "000000.00",
104 "port-shelf": "Chassis#1"
107 "lgx-device-name": "Some lgx-device-name",
108 "lgx-port-name": "Some lgx-port-name",
109 "lgx-port-rack": "000000.00",
110 "lgx-port-shelf": "00"
115 "port-device-name": "SPDR-SB1-XPDR2",
116 "port-type": "fixed",
117 "port-name": "XPDR2-NETWORK1",
118 "port-rack": "000000.00",
119 "port-shelf": "Chassis#1"
122 "lgx-device-name": "Some lgx-device-name",
123 "lgx-port-name": "Some lgx-port-name",
124 "lgx-port-rack": "000000.00",
125 "lgx-port-shelf": "00"
130 "due-date": "2018-06-15T00:00:01Z",
131 "operator-contact": "pw1234"
137 cls.processes = test_utils.start_tpce()
138 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
139 ('spdrb', cls.NODE_VERSION),
140 ('spdrc', cls.NODE_VERSION),
141 ('roadma', cls.NODE_VERSION),
142 ('roadmb', cls.NODE_VERSION),
143 ('roadmc', cls.NODE_VERSION)])
146 def tearDownClass(cls):
147 # pylint: disable=not-an-iterable
148 for process in cls.processes:
149 test_utils.shutdown_process(process)
150 print("all processes killed")
155 def test_001_connect_spdrA(self):
156 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
157 self.assertEqual(response.status_code,
158 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160 def test_002_connect_spdrB(self):
161 response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
162 self.assertEqual(response.status_code,
163 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
165 def test_003_connect_spdrC(self):
166 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
167 self.assertEqual(response.status_code,
168 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
170 def test_004_connect_rdmA(self):
171 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
172 self.assertEqual(response.status_code,
173 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
175 def test_005_connect_rdmB(self):
176 response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
177 self.assertEqual(response.status_code,
178 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
180 def test_006_connect_rdmC(self):
181 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
182 self.assertEqual(response.status_code,
183 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
185 def test_007_connect_sprdA_1_N1_to_roadmA_PP1(self):
186 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
187 "ROADM-A1", "1", "SRG1-PP1-TXRX")
188 self.assertEqual(response.status_code, requests.codes.ok)
189 res = response.json()
190 self.assertIn('Xponder Roadm Link created successfully',
191 res["output"]["result"])
194 def test_008_connect_roadmA_PP1_to_spdrA_1_N1(self):
195 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
196 "ROADM-A1", "1", "SRG1-PP1-TXRX")
197 self.assertEqual(response.status_code, requests.codes.ok)
198 res = response.json()
199 self.assertIn('Roadm Xponder links created successfully',
200 res["output"]["result"])
203 def test_009_connect_sprdC_1_N1_to_roadmC_PP1(self):
204 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
205 "ROADM-C1", "1", "SRG1-PP1-TXRX")
206 self.assertEqual(response.status_code, requests.codes.ok)
207 res = response.json()
208 self.assertIn('Xponder Roadm Link created successfully',
209 res["output"]["result"])
212 def test_010_connect_roadmC_PP1_to_spdrC_1_N1(self):
213 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
214 "ROADM-C1", "1", "SRG1-PP1-TXRX")
215 self.assertEqual(response.status_code, requests.codes.ok)
216 res = response.json()
217 self.assertIn('Roadm Xponder links created successfully',
218 res["output"]["result"])
221 def test_011_connect_sprdB_2_N1_to_roadmB_PP1(self):
222 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1",
223 "ROADM-B1", "1", "SRG1-PP1-TXRX")
224 self.assertEqual(response.status_code, requests.codes.ok)
225 res = response.json()
226 self.assertIn('Xponder Roadm Link created successfully',
227 res["output"]["result"])
230 def test_012_connect_roadmB_PP1_to_spdrB_2_N1(self):
231 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1",
232 "ROADM-B1", "1", "SRG1-PP1-TXRX")
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
235 self.assertIn('Roadm Xponder links created successfully',
236 res["output"]["result"])
239 def test_013_connect_sprdB_2_N2_to_roadmB_PP2(self):
240 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2",
241 "ROADM-B1", "1", "SRG1-PP2-TXRX")
242 self.assertEqual(response.status_code, requests.codes.ok)
243 res = response.json()
244 self.assertIn('Xponder Roadm Link created successfully',
245 res["output"]["result"])
248 def test_014_connect_roadmB_PP2_to_spdrB_2_N2(self):
249 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2",
250 "ROADM-B1", "1", "SRG1-PP2-TXRX")
251 self.assertEqual(response.status_code, requests.codes.ok)
252 res = response.json()
253 self.assertIn('Roadm Xponder links created successfully',
254 res["output"]["result"])
257 def test_015_add_omsAttributes_ROADMA_ROADMB(self):
258 # Config ROADMA-ROADMC oms-attributes
260 "auto-spanloss": "true",
261 "spanloss-base": 11.4,
262 "spanloss-current": 12,
263 "engineered-spanloss": 12.2,
264 "link-concatenation": [{
267 "SRLG-length": 100000,
269 response = test_utils.add_oms_attr_request(
270 "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
271 self.assertEqual(response.status_code, requests.codes.created)
273 def test_016_add_omsAttributes_ROADMB_ROADMA(self):
274 # Config ROADMC-ROADMA oms-attributes
276 "auto-spanloss": "true",
277 "spanloss-base": 11.4,
278 "spanloss-current": 12,
279 "engineered-spanloss": 12.2,
280 "link-concatenation": [{
283 "SRLG-length": 100000,
285 response = test_utils.add_oms_attr_request(
286 "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
287 self.assertEqual(response.status_code, requests.codes.created)
289 def test_017_add_omsAttributes_ROADMB_ROADMC(self):
290 # Config ROADMA-ROADMC oms-attributes
292 "auto-spanloss": "true",
293 "spanloss-base": 11.4,
294 "spanloss-current": 12,
295 "engineered-spanloss": 12.2,
296 "link-concatenation": [{
299 "SRLG-length": 100000,
301 response = test_utils.add_oms_attr_request(
302 "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
303 self.assertEqual(response.status_code, requests.codes.created)
305 def test_018_add_omsAttributes_ROADMC_ROADMB(self):
306 # Config ROADMC-ROADMA oms-attributes
308 "auto-spanloss": "true",
309 "spanloss-base": 11.4,
310 "spanloss-current": 12,
311 "engineered-spanloss": 12.2,
312 "link-concatenation": [{
315 "SRLG-length": 100000,
317 response = test_utils.add_oms_attr_request(
318 "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
319 self.assertEqual(response.status_code, requests.codes.created)
321 def test_019_create_OTS_ROADMA_DEG1(self):
322 response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
324 self.assertEqual(response.status_code, requests.codes.ok)
325 res = response.json()
326 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
327 res["output"]["result"])
329 def test_020_create_OTS_ROADMB_DEG1(self):
330 response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
334 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
335 res["output"]["result"])
337 def test_021_create_OTS_ROADMB_DEG2(self):
338 response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX")
340 self.assertEqual(response.status_code, requests.codes.ok)
341 res = response.json()
342 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
343 res["output"]["result"])
345 def test_022_create_OTS_ROADMC_DEG2(self):
346 response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
348 self.assertEqual(response.status_code, requests.codes.ok)
349 res = response.json()
350 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
351 res["output"]["result"])
353 def test_023_calculate_span_loss_base_all(self):
354 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
360 response = test_utils.post_request(url, data)
361 self.assertEqual(response.status_code, requests.codes.ok)
362 res = response.json()
363 self.assertIn('Success',
364 res["output"]["result"])
367 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
368 }, res["output"]["spans"])
371 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
372 }, res["output"]["spans"])
375 "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
376 }, res["output"]["spans"])
379 "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
380 }, res["output"]["spans"])
383 "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
384 }, res["output"]["spans"])
387 "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
388 }, res["output"]["spans"])
391 def test_024_check_otn_topology(self):
392 response = test_utils.get_otn_topo_request()
393 self.assertEqual(response.status_code, requests.codes.ok)
394 res = response.json()
395 nbNode = len(res['network'][0]['node'])
396 self.assertEqual(nbNode, 9, 'There should be 9 nodes')
397 self.assertNotIn('ietf-network-topology:link', res['network'][0],
398 'otn-topology should have no link')
400 # test service-create for OCH-OTU4 service from spdrA to spdrB
401 def test_025_create_OCH_OTU4_service_AB(self):
402 response = test_utils.service_create_request(self.cr_serv_sample_data)
403 self.assertEqual(response.status_code, requests.codes.ok)
404 res = response.json()
405 self.assertIn('PCE calculation in progress',
406 res['output']['configuration-response-common']['response-message'])
407 time.sleep(self.WAITING)
409 def test_026_get_OCH_OTU4_service_AB(self):
410 response = test_utils.get_service_list_request(
411 "services/service-OCH-OTU4-AB")
412 self.assertEqual(response.status_code, requests.codes.ok)
413 res = response.json()
415 res['services'][0]['administrative-state'], 'inService')
417 res['services'][0]['service-name'], 'service-OCH-OTU4-AB')
419 res['services'][0]['connection-type'], 'infrastructure')
421 res['services'][0]['lifecycle-state'], 'planned')
424 def test_027_check_otn_topo_otu4_links(self):
425 response = test_utils.get_otn_topo_request()
426 self.assertEqual(response.status_code, requests.codes.ok)
427 res = response.json()
428 nb_links = len(res['network'][0]['ietf-network-topology:link'])
429 self.assertEqual(nb_links, 2)
430 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
431 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
432 for link in res['network'][0]['ietf-network-topology:link']:
433 self.assertIn(link['link-id'], listLinkId)
435 link['transportpce-topology:otn-link-type'], 'OTU4')
437 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
439 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
441 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
443 link['org-openroadm-common-network:opposite-link'], listLinkId)
445 # test service-create for OCH-OTU4 service from spdrB to spdrC
446 def test_028_create_OCH_OTU4_service_BC(self):
447 # pylint: disable=line-too-long
448 self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC"
449 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1"
450 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB"
451 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
452 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
453 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
454 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
455 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
456 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
457 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
458 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1"
459 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
460 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1"
462 response = test_utils.service_create_request(self.cr_serv_sample_data)
463 self.assertEqual(response.status_code, requests.codes.ok)
464 res = response.json()
465 self.assertIn('PCE calculation in progress',
466 res['output']['configuration-response-common']['response-message'])
467 time.sleep(self.WAITING)
469 def test_029_get_OCH_OTU4_service_BC(self):
470 response = test_utils.get_service_list_request(
471 "services/service-OCH-OTU4-BC")
472 self.assertEqual(response.status_code, requests.codes.ok)
473 res = response.json()
475 res['services'][0]['administrative-state'], 'inService')
477 res['services'][0]['service-name'], 'service-OCH-OTU4-BC')
479 res['services'][0]['connection-type'], 'infrastructure')
481 res['services'][0]['lifecycle-state'], 'planned')
484 def test_030_check_otn_topo_otu4_links(self):
485 response = test_utils.get_otn_topo_request()
486 self.assertEqual(response.status_code, requests.codes.ok)
487 res = response.json()
488 nb_links = len(res['network'][0]['ietf-network-topology:link'])
489 self.assertEqual(nb_links, 4)
490 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
491 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1',
492 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
493 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
494 for link in res['network'][0]['ietf-network-topology:link']:
495 self.assertIn(link['link-id'], listLinkId)
497 link['transportpce-topology:otn-link-type'], 'OTU4')
499 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
501 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
503 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
505 link['org-openroadm-common-network:opposite-link'], listLinkId)
507 # test service-create for ODU4 service from spdrA to spdrC via spdrB
508 def test_031_create_ODU4_service(self):
509 # pylint: disable=line-too-long
510 self.cr_serv_sample_data["input"]["service-name"] = "service-ODU4-ABC"
511 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
512 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
513 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
514 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
515 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
516 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
517 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1"
518 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
519 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1"
520 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
521 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
522 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
523 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
524 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
525 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
526 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1"
527 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
528 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1"
530 response = test_utils.service_create_request(self.cr_serv_sample_data)
531 self.assertEqual(response.status_code, requests.codes.ok)
532 res = response.json()
533 self.assertIn('PCE calculation in progress',
534 res['output']['configuration-response-common']['response-message'])
535 time.sleep(self.WAITING)
537 def test_032_get_ODU4_service_ABC(self):
538 response = test_utils.get_service_list_request(
539 "services/service-ODU4-ABC")
540 self.assertEqual(response.status_code, requests.codes.ok)
541 res = response.json()
543 res['services'][0]['administrative-state'], 'inService')
545 res['services'][0]['service-name'], 'service-ODU4-ABC')
547 res['services'][0]['connection-type'], 'infrastructure')
549 res['services'][0]['lifecycle-state'], 'planned')
552 def test_033_check_interface_ODU4_spdra(self):
553 response = test_utils.check_netconf_node_request(
554 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
555 self.assertEqual(response.status_code, requests.codes.ok)
556 res = response.json()
557 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
558 'administrative-state': 'inService',
559 'supporting-circuit-pack-name': 'CP1-CFP0',
560 'type': 'org-openroadm-interfaces:otnOdu',
561 'supporting-port': 'CP1-CFP0-P1'}
562 # SAPI/DAPI are added in the Otu4 renderer
563 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
564 'rate': 'org-openroadm-otn-common-types:ODU4',
565 'expected-dapi': 'H/OelLynehI=',
566 'expected-sapi': 'AMf1n5hK6Xkk',
567 'tx-dapi': 'AMf1n5hK6Xkk',
568 'tx-sapi': 'H/OelLynehI='}
570 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
572 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
574 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
576 self.assertDictEqual(
577 {'payload-type': '21', 'exp-payload-type': '21'},
578 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
579 response2 = test_utils.check_netconf_node_request(
580 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
581 self.assertEqual(response.status_code, requests.codes.ok)
582 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
583 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
584 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
585 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
586 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
588 def test_034_check_interface_ODU4_spdrc(self):
589 response = test_utils.check_netconf_node_request(
590 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
591 self.assertEqual(response.status_code, requests.codes.ok)
592 res = response.json()
593 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
594 'administrative-state': 'inService',
595 'supporting-circuit-pack-name': 'CP1-CFP0',
596 'type': 'org-openroadm-interfaces:otnOdu',
597 'supporting-port': 'CP1-CFP0-P1'}
598 # SAPI/DAPI are added in the Otu4 renderer
599 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
600 'rate': 'org-openroadm-otn-common-types:ODU4',
601 'expected-dapi': 'AMf1n5hK6Xkk',
602 'expected-sapi': 'H/OelLynehI=',
603 'tx-dapi': 'H/OelLynehI=',
604 'tx-sapi': 'AMf1n5hK6Xkk'}
605 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
607 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
609 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
611 self.assertDictEqual(
612 {'payload-type': '21', 'exp-payload-type': '21'},
613 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
614 response2 = test_utils.check_netconf_node_request(
615 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
616 self.assertEqual(response.status_code, requests.codes.ok)
617 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
618 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
619 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
620 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
621 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
623 def test_035_check_interface_ODU4_NETWORK1_spdrb(self):
624 response = test_utils.check_netconf_node_request(
625 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
626 self.assertEqual(response.status_code, requests.codes.ok)
627 res = response.json()
628 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
629 'administrative-state': 'inService',
630 'supporting-circuit-pack-name': 'CP5-CFP',
631 'type': 'org-openroadm-interfaces:otnOdu',
632 'supporting-port': 'CP5-CFP-P1'}
634 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
635 'rate': 'org-openroadm-otn-common-types:ODU4',
636 'monitoring-mode': 'monitored'}
638 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
640 self.assertDictEqual(dict(input_dict_2,
641 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
642 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
643 self.assertNotIn('opu',
644 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
646 def test_036_check_interface_ODU4_NETWORK2_spdrb(self):
647 response = test_utils.check_netconf_node_request(
648 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
649 self.assertEqual(response.status_code, requests.codes.ok)
650 res = response.json()
651 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
652 'administrative-state': 'inService',
653 'supporting-circuit-pack-name': 'CP6-CFP',
654 'type': 'org-openroadm-interfaces:otnOdu',
655 'supporting-port': 'CP6-CFP-P1'}
657 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
658 'rate': 'org-openroadm-otn-common-types:ODU4',
659 'monitoring-mode': 'monitored'}
661 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
663 self.assertDictEqual(dict(input_dict_2,
664 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
665 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
666 self.assertNotIn('opu',
667 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
669 def test_037_check_ODU4_connection_spdrb(self):
670 response = test_utils.check_netconf_node_request(
672 "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4")
673 self.assertEqual(response.status_code, requests.codes.ok)
674 res = response.json()
677 'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
678 'direction': 'bidirectional'
681 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
682 res['odu-connection'][0])
683 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'},
684 res['odu-connection'][0]['destination'])
685 self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'},
686 res['odu-connection'][0]['source'])
688 def test_038_check_otn_topo_links(self):
689 response = test_utils.get_otn_topo_request()
690 self.assertEqual(response.status_code, requests.codes.ok)
691 res = response.json()
692 nb_links = len(res['network'][0]['ietf-network-topology:link'])
693 self.assertEqual(nb_links, 6)
694 for link in res['network'][0]['ietf-network-topology:link']:
695 if 'OTU4' in link['link-id']:
697 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
699 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
700 elif 'ODTU4' in link['link-id']:
702 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
704 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
706 link['transportpce-topology:otn-link-type'], 'ODTU4')
708 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
709 self.assertIn(link['org-openroadm-common-network:opposite-link'],
710 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
711 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
713 self.fail("this link should not exist")
715 def test_039_check_otn_topo_tp(self):
716 response = test_utils.get_otn_topo_request()
717 res = response.json()
718 for node in res['network'][0]['node']:
719 if node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1':
720 tpList = node['ietf-network-topology:termination-point']
722 if tp['tp-id'] == 'XPDR1-NETWORK1':
723 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
724 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
726 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
727 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
728 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
730 # test service-create for 10GE service from spdr to spdr
731 def test_040_create_10GE_service(self):
732 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
733 self.cr_serv_sample_data["input"]["connection-type"] = "service"
734 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
735 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
736 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
737 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
738 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
739 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
740 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
741 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
742 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
743 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
744 response = test_utils.service_create_request(self.cr_serv_sample_data)
745 self.assertEqual(response.status_code, requests.codes.ok)
746 res = response.json()
747 self.assertIn('PCE calculation in progress',
748 res['output']['configuration-response-common']['response-message'])
749 time.sleep(self.WAITING)
751 def test_041_get_10GE_service1(self):
752 response = test_utils.get_service_list_request(
753 "services/service1-10GE")
754 self.assertEqual(response.status_code, requests.codes.ok)
755 res = response.json()
757 res['services'][0]['administrative-state'], 'inService')
759 res['services'][0]['service-name'], 'service1-10GE')
761 res['services'][0]['connection-type'], 'service')
763 res['services'][0]['lifecycle-state'], 'planned')
766 def test_042_check_interface_10GE_CLIENT_spdra(self):
767 response = test_utils.check_netconf_node_request(
768 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
769 self.assertEqual(response.status_code, requests.codes.ok)
770 res = response.json()
771 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
772 'administrative-state': 'inService',
773 'supporting-circuit-pack-name': 'CP1-SFP4',
774 'type': 'org-openroadm-interfaces:ethernetCsmacd',
775 'supporting-port': 'CP1-SFP4-P1'
777 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
779 self.assertDictEqual(
781 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
783 def test_043_check_interface_ODU2E_CLIENT_spdra(self):
784 response = test_utils.check_netconf_node_request(
785 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
786 self.assertEqual(response.status_code, requests.codes.ok)
787 res = response.json()
788 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
789 'administrative-state': 'inService',
790 'supporting-circuit-pack-name': 'CP1-SFP4',
791 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
792 'type': 'org-openroadm-interfaces:otnOdu',
793 'supporting-port': 'CP1-SFP4-P1'}
795 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
796 'rate': 'org-openroadm-otn-common-types:ODU2e',
797 'monitoring-mode': 'terminated',
798 'expected-dapi': 'B68VWipZAU0=',
799 'expected-sapi': 'BcwI5xz79t8=',
800 'tx-dapi': 'BcwI5xz79t8=',
801 'tx-sapi': 'B68VWipZAU0='}
803 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
805 self.assertDictEqual(dict(input_dict_2,
806 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
807 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
808 self.assertDictEqual(
809 {'payload-type': '03', 'exp-payload-type': '03'},
810 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
811 response2 = test_utils.check_netconf_node_request(
812 "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
813 self.assertEqual(response.status_code, requests.codes.ok)
814 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
815 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
816 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
817 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
818 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
820 def test_044_check_interface_ODU2E_NETWORK_spdra(self):
821 response = test_utils.check_netconf_node_request(
822 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
823 self.assertEqual(response.status_code, requests.codes.ok)
824 res = response.json()
825 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
826 'administrative-state': 'inService',
827 'supporting-circuit-pack-name': 'CP1-CFP0',
828 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
829 'type': 'org-openroadm-interfaces:otnOdu',
830 'supporting-port': 'CP1-CFP0-P1'}
832 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
833 'rate': 'org-openroadm-otn-common-types:ODU2e',
834 'monitoring-mode': 'monitored'}
835 input_dict_3 = {'trib-port-number': 1}
837 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
839 self.assertDictEqual(dict(input_dict_2,
840 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
841 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
842 self.assertDictEqual(dict(input_dict_3,
843 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
844 'parent-odu-allocation']),
845 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
846 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
849 def test_045_check_ODU2E_connection_spdra(self):
850 response = test_utils.check_netconf_node_request(
852 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
853 self.assertEqual(response.status_code, requests.codes.ok)
854 res = response.json()
857 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
858 'direction': 'bidirectional'
861 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
862 res['odu-connection'][0])
863 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'},
864 res['odu-connection'][0]['destination'])
865 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'},
866 res['odu-connection'][0]['source'])
868 def test_046_check_interface_10GE_CLIENT_spdrc(self):
869 response = test_utils.check_netconf_node_request(
870 "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
871 self.assertEqual(response.status_code, requests.codes.ok)
872 res = response.json()
873 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
874 'administrative-state': 'inService',
875 'supporting-circuit-pack-name': 'CP1-SFP4',
876 'type': 'org-openroadm-interfaces:ethernetCsmacd',
877 'supporting-port': 'CP1-SFP4-P1'
879 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
881 self.assertDictEqual(
883 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
885 def test_047_check_interface_ODU2E_CLIENT_spdrc(self):
886 response = test_utils.check_netconf_node_request(
887 "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
888 self.assertEqual(response.status_code, requests.codes.ok)
889 res = response.json()
890 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
891 'administrative-state': 'inService',
892 'supporting-circuit-pack-name': 'CP1-SFP4',
893 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
894 'type': 'org-openroadm-interfaces:otnOdu',
895 'supporting-port': 'CP1-SFP4-P1'}
897 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
898 'rate': 'org-openroadm-otn-common-types:ODU2e',
899 'monitoring-mode': 'terminated',
900 'expected-dapi': 'BcwI5xz79t8=',
901 'expected-sapi': 'B68VWipZAU0=',
902 'tx-dapi': 'B68VWipZAU0=',
903 'tx-sapi': 'BcwI5xz79t8='}
905 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
907 self.assertDictEqual(dict(input_dict_2,
908 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
909 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
910 self.assertDictEqual(
911 {'payload-type': '03', 'exp-payload-type': '03'},
912 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
913 response2 = test_utils.check_netconf_node_request(
914 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
915 self.assertEqual(response.status_code, requests.codes.ok)
916 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
917 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
918 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
919 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
920 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
922 def test_048_check_interface_ODU2E_NETWORK_spdrc(self):
923 response = test_utils.check_netconf_node_request(
924 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
925 self.assertEqual(response.status_code, requests.codes.ok)
926 res = response.json()
927 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
928 'administrative-state': 'inService',
929 'supporting-circuit-pack-name': 'CP1-CFP0',
930 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
931 'type': 'org-openroadm-interfaces:otnOdu',
932 'supporting-port': 'CP1-CFP0-P1'}
934 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
935 'rate': 'org-openroadm-otn-common-types:ODU2e',
936 'monitoring-mode': 'monitored'}
937 input_dict_3 = {'trib-port-number': 1}
939 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
941 self.assertDictEqual(dict(input_dict_2,
942 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
943 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
944 self.assertDictEqual(dict(input_dict_3,
945 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
946 'parent-odu-allocation']),
947 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
948 'parent-odu-allocation'])
951 'org-openroadm-otn-odu-interfaces:odu'][
952 'parent-odu-allocation']['trib-slots'])
954 def test_049_check_ODU2E_connection_spdrc(self):
955 response = test_utils.check_netconf_node_request(
957 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
958 self.assertEqual(response.status_code, requests.codes.ok)
959 res = response.json()
962 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
963 'direction': 'bidirectional'
966 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
967 res['odu-connection'][0])
968 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'},
969 res['odu-connection'][0]['destination'])
970 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'},
971 res['odu-connection'][0]['source'])
973 def test_050_check_otn_topo_links(self):
974 response = test_utils.get_otn_topo_request()
975 self.assertEqual(response.status_code, requests.codes.ok)
976 res = response.json()
977 nb_links = len(res['network'][0]['ietf-network-topology:link'])
978 self.assertEqual(nb_links, 6)
979 for link in res['network'][0]['ietf-network-topology:link']:
980 linkId = link['link-id']
981 if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
982 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
984 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
986 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
988 def test_051_check_otn_topo_tp(self):
989 response = test_utils.get_otn_topo_request()
990 res = response.json()
991 for node in res['network'][0]['node']:
992 if node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1':
993 tpList = node['ietf-network-topology:termination-point']
995 if tp['tp-id'] == 'XPDR1-NETWORK1':
996 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
997 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
998 tsPoolList = list(range(1, 9))
1000 tsPoolList, xpdrTpPortConAt['ts-pool'])
1002 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1004 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1006 def test_052_delete_10GE_service(self):
1007 response = test_utils.service_delete_request("service1-10GE")
1008 self.assertEqual(response.status_code, requests.codes.ok)
1009 res = response.json()
1010 self.assertIn('Renderer service delete in progress',
1011 res['output']['configuration-response-common']['response-message'])
1012 time.sleep(self.WAITING)
1014 def test_053_check_service_list(self):
1015 response = test_utils.get_service_list_request("")
1016 self.assertEqual(response.status_code, requests.codes.ok)
1017 res = response.json()
1018 self.assertEqual(len(res['service-list']['services']), 3)
1021 def test_054_check_no_ODU2e_connection_spdra(self):
1022 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1023 self.assertEqual(response.status_code, requests.codes.ok)
1024 res = response.json()
1025 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1028 def test_055_check_no_interface_ODU2E_NETWORK_spdra(self):
1029 response = test_utils.check_netconf_node_request(
1030 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
1031 self.assertEqual(response.status_code, requests.codes.conflict)
1033 def test_056_check_no_interface_ODU2E_CLIENT_spdra(self):
1034 response = test_utils.check_netconf_node_request(
1035 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
1036 self.assertEqual(response.status_code, requests.codes.conflict)
1038 def test_057_check_no_interface_10GE_CLIENT_spdra(self):
1039 response = test_utils.check_netconf_node_request(
1040 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
1041 self.assertEqual(response.status_code, requests.codes.conflict)
1043 def test_058_check_otn_topo_links(self):
1044 response = test_utils.get_otn_topo_request()
1045 self.assertEqual(response.status_code, requests.codes.ok)
1046 res = response.json()
1047 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1048 self.assertEqual(nb_links, 6)
1049 for link in res['network'][0]['ietf-network-topology:link']:
1050 linkId = link['link-id']
1051 if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
1052 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
1054 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1056 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1058 def test_059_check_otn_topo_tp(self):
1059 response = test_utils.get_otn_topo_request()
1060 res = response.json()
1061 for node in res['network'][0]['node']:
1062 if (node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1'):
1063 tpList = node['ietf-network-topology:termination-point']
1065 if tp['tp-id'] == 'XPDR1-NETWORK1':
1066 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1067 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1069 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1071 def test_060_delete_ODU4_service(self):
1072 response = test_utils.service_delete_request("service-ODU4-ABC")
1073 self.assertEqual(response.status_code, requests.codes.ok)
1074 res = response.json()
1075 self.assertIn('Renderer service delete in progress',
1076 res['output']['configuration-response-common']['response-message'])
1077 time.sleep(self.WAITING)
1079 def test_061_check_service_list(self):
1080 response = test_utils.get_service_list_request("")
1081 self.assertEqual(response.status_code, requests.codes.ok)
1082 res = response.json()
1083 self.assertEqual(len(res['service-list']['services']), 2)
1086 def test_062_check_no_interface_ODU4_spdra(self):
1087 response = test_utils.check_netconf_node_request(
1088 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
1089 self.assertEqual(response.status_code, requests.codes.conflict)
1091 def test_063_check_no_interface_ODU4_spdrb(self):
1092 response = test_utils.check_netconf_node_request(
1093 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1094 self.assertEqual(response.status_code, requests.codes.conflict)
1095 response = test_utils.check_netconf_node_request(
1096 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1097 self.assertEqual(response.status_code, requests.codes.conflict)
1099 def test_064_check_no_ODU4_connection_spdrb(self):
1100 response = test_utils.check_netconf_node_request("SPDR-SB1", "")
1101 self.assertEqual(response.status_code, requests.codes.ok)
1102 res = response.json()
1103 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1106 def test_065_check_no_interface_ODU4_spdrc(self):
1107 response = test_utils.check_netconf_node_request(
1108 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
1109 self.assertEqual(response.status_code, requests.codes.conflict)
1111 def test_066_check_otn_topo_links(self):
1112 self.test_030_check_otn_topo_otu4_links()
1114 def test_067_check_otn_topo_tp(self):
1115 response = test_utils.get_otn_topo_request()
1116 res = response.json()
1117 for node in res['network'][0]['node']:
1118 if node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1':
1119 tpList = node['ietf-network-topology:termination-point']
1121 if tp['tp-id'] == 'XPDR1-NETWORK1':
1122 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1123 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
1125 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
1127 # test service-create for ODU4 service from spdrA to spdrB
1128 def test_068_create_ODU4_service_AB(self):
1129 # pylint: disable=line-too-long
1130 self.cr_serv_sample_data["input"]["service-name"] = "service-ODU4-AB"
1131 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1132 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1133 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1134 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1135 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
1136 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1"
1137 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
1138 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1"
1139 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SB1"
1140 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSB"
1141 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1142 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1143 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1144 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
1145 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1146 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
1147 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1149 response = test_utils.service_create_request(self.cr_serv_sample_data)
1150 self.assertEqual(response.status_code, requests.codes.ok)
1151 res = response.json()
1152 self.assertIn('PCE calculation in progress',
1153 res['output']['configuration-response-common']['response-message'])
1154 time.sleep(self.WAITING)
1156 def test_069_get_ODU4_service_AB(self):
1157 response = test_utils.get_service_list_request(
1158 "services/service-ODU4-AB")
1159 self.assertEqual(response.status_code, requests.codes.ok)
1160 res = response.json()
1162 res['services'][0]['administrative-state'], 'inService')
1164 res['services'][0]['service-name'], 'service-ODU4-AB')
1166 res['services'][0]['connection-type'], 'infrastructure')
1168 res['services'][0]['lifecycle-state'], 'planned')
1171 def test_070_check_interface_ODU4_spdra(self):
1172 response = test_utils.check_netconf_node_request(
1173 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
1174 self.assertEqual(response.status_code, requests.codes.ok)
1175 res = response.json()
1176 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
1177 'administrative-state': 'inService',
1178 'supporting-circuit-pack-name': 'CP1-CFP0',
1179 'type': 'org-openroadm-interfaces:otnOdu',
1180 'supporting-port': 'CP1-CFP0-P1'}
1181 # SAPI/DAPI are added in the Otu4 renderer
1182 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1183 'rate': 'org-openroadm-otn-common-types:ODU4',
1184 'expected-dapi': 'H/OelLynehI=',
1185 'expected-sapi': 'X+8cRNi+HbE=',
1186 'tx-dapi': 'X+8cRNi+HbE=',
1187 'tx-sapi': 'H/OelLynehI='}
1189 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1190 res['interface'][0])
1191 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1193 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1195 self.assertDictEqual(
1196 {'payload-type': '21', 'exp-payload-type': '21'},
1197 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1198 response2 = test_utils.check_netconf_node_request(
1199 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1200 self.assertEqual(response.status_code, requests.codes.ok)
1201 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1202 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1203 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1204 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1205 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1207 def test_071_check_interface_ODU4_spdrb_N1(self):
1208 response = test_utils.check_netconf_node_request(
1209 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1210 self.assertEqual(response.status_code, requests.codes.ok)
1211 res = response.json()
1212 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1213 'administrative-state': 'inService',
1214 'supporting-circuit-pack-name': 'CP1-CFP0',
1215 'type': 'org-openroadm-interfaces:otnOdu',
1216 'supporting-port': 'CP1-CFP0-P1'}
1217 # SAPI/DAPI are added in the Otu4 renderer
1218 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1219 'rate': 'org-openroadm-otn-common-types:ODU4',
1220 'expected-dapi': 'X+8cRNi+HbE=',
1221 'expected-sapi': 'H/OelLynehI=',
1222 'tx-dapi': 'H/OelLynehI=',
1223 'tx-sapi': 'X+8cRNi+HbE='}
1225 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1226 res['interface'][0])
1227 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1229 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1231 self.assertDictEqual(
1232 {'payload-type': '21', 'exp-payload-type': '21'},
1233 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1234 response2 = test_utils.check_netconf_node_request(
1235 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
1236 self.assertEqual(response.status_code, requests.codes.ok)
1237 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1238 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1239 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1240 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1241 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1243 # test service-create for ODU4 service from spdrB to spdrC
1244 def test_072_create_ODU4_service_BC(self):
1245 # pylint: disable=line-too-long
1246 self.cr_serv_sample_data["input"]["service-name"] = "service-ODU4-BC"
1247 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
1248 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
1249 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
1250 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
1251 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
1252 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
1253 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
1254 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1"
1255 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1"
1256 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1"
1258 response = test_utils.service_create_request(self.cr_serv_sample_data)
1259 self.assertEqual(response.status_code, requests.codes.ok)
1260 res = response.json()
1261 self.assertIn('PCE calculation in progress',
1262 res['output']['configuration-response-common']['response-message'])
1263 time.sleep(self.WAITING)
1265 def test_073_get_ODU4_service_AB(self):
1266 response = test_utils.get_service_list_request(
1267 "services/service-ODU4-BC")
1268 self.assertEqual(response.status_code, requests.codes.ok)
1269 res = response.json()
1271 res['services'][0]['administrative-state'], 'inService')
1273 res['services'][0]['service-name'], 'service-ODU4-BC')
1275 res['services'][0]['connection-type'], 'infrastructure')
1277 res['services'][0]['lifecycle-state'], 'planned')
1280 def test_074_check_interface_ODU4_spdrb_N2(self):
1281 response = test_utils.check_netconf_node_request(
1282 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1283 self.assertEqual(response.status_code, requests.codes.ok)
1284 res = response.json()
1285 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1286 'administrative-state': 'inService',
1287 'supporting-circuit-pack-name': 'CP1-CFP0',
1288 'type': 'org-openroadm-interfaces:otnOdu',
1289 'supporting-port': 'CP1-CFP0-P1'}
1290 # SAPI/DAPI are added in the Otu4 renderer
1291 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1292 'rate': 'org-openroadm-otn-common-types:ODU4',
1293 'expected-dapi': 'X+8cRNi+HbI=',
1294 'expected-sapi': 'AMf1n5hK6Xkk',
1295 'tx-dapi': 'AMf1n5hK6Xkk',
1296 'tx-sapi': 'X+8cRNi+HbI='}
1298 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1299 res['interface'][0])
1300 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1302 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1304 self.assertDictEqual(
1305 {'payload-type': '21', 'exp-payload-type': '21'},
1306 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1307 response2 = test_utils.check_netconf_node_request(
1308 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
1309 self.assertEqual(response.status_code, requests.codes.ok)
1310 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1311 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1312 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1313 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1314 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1316 def test_075_check_interface_ODU4_spdrc(self):
1317 response = test_utils.check_netconf_node_request(
1318 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
1319 self.assertEqual(response.status_code, requests.codes.ok)
1320 res = response.json()
1321 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
1322 'administrative-state': 'inService',
1323 'supporting-circuit-pack-name': 'CP1-CFP0',
1324 'type': 'org-openroadm-interfaces:otnOdu',
1325 'supporting-port': 'CP1-CFP0-P1'}
1326 # SAPI/DAPI are added in the Otu4 renderer
1327 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1328 'rate': 'org-openroadm-otn-common-types:ODU4',
1329 'expected-dapi': 'AMf1n5hK6Xkk',
1330 'expected-sapi': 'X+8cRNi+HbI=',
1331 'tx-dapi': 'X+8cRNi+HbI=',
1332 'tx-sapi': 'AMf1n5hK6Xkk'}
1333 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1334 res['interface'][0])
1335 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1337 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1339 self.assertDictEqual(
1340 {'payload-type': '21', 'exp-payload-type': '21'},
1341 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1342 response2 = test_utils.check_netconf_node_request(
1343 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1344 self.assertEqual(response.status_code, requests.codes.ok)
1345 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1346 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1347 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1348 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1349 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1351 # test service-create for 10GE service from spdr to spdr
1352 def test_076_create_10GE_service_ABC(self):
1353 # pylint: disable=line-too-long
1354 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
1355 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1356 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
1357 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
1358 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
1359 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1360 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1361 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
1362 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1363 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1"
1364 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1365 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
1366 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1367 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1368 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1369 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
1370 response = test_utils.service_create_request(self.cr_serv_sample_data)
1371 self.assertEqual(response.status_code, requests.codes.ok)
1372 res = response.json()
1373 self.assertIn('PCE calculation in progress',
1374 res['output']['configuration-response-common']['response-message'])
1375 time.sleep(self.WAITING)
1377 def test_077_check_configuration_spdra_spdrc(self):
1378 self.test_041_get_10GE_service1()
1379 self.test_042_check_interface_10GE_CLIENT_spdra()
1380 self.test_043_check_interface_ODU2E_CLIENT_spdra()
1381 self.test_044_check_interface_ODU2E_NETWORK_spdra()
1382 self.test_045_check_ODU2E_connection_spdra()
1383 self.test_046_check_interface_10GE_CLIENT_spdrc()
1384 self.test_047_check_interface_ODU2E_CLIENT_spdrc()
1385 self.test_049_check_ODU2E_connection_spdrc()
1387 def test_078_check_interface_ODU2E_NETWORK1_spdrb(self):
1388 response = test_utils.check_netconf_node_request(
1389 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU2e-service1-10GE")
1390 self.assertEqual(response.status_code, requests.codes.ok)
1391 res = response.json()
1392 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU2e-service1-10GE',
1393 'administrative-state': 'inService',
1394 'supporting-circuit-pack-name': 'CP1-CFP0',
1395 'supporting-interface': 'XPDR2-NETWORK1-ODU4',
1396 'type': 'org-openroadm-interfaces:otnOdu',
1397 'supporting-port': 'CP1-CFP0-P1'}
1399 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1400 'rate': 'org-openroadm-otn-common-types:ODU2e',
1401 'monitoring-mode': 'monitored'}
1402 input_dict_3 = {'trib-port-number': 1}
1404 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1405 res['interface'][0])
1406 self.assertDictEqual(dict(input_dict_2,
1407 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1408 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1409 self.assertDictEqual(dict(input_dict_3,
1410 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1411 'parent-odu-allocation']),
1412 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1413 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1416 def test_079_check_interface_ODU2E_NETWORK2_spdrb(self):
1417 response = test_utils.check_netconf_node_request(
1418 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU2e-service1-10GE")
1419 self.assertEqual(response.status_code, requests.codes.ok)
1420 res = response.json()
1421 input_dict_1 = {'name': 'XPDR2-NETWORK2-ODU2e-service1-10GE',
1422 'administrative-state': 'inService',
1423 'supporting-circuit-pack-name': 'CP1-CFP0',
1424 'supporting-interface': 'XPDR2-NETWORK2-ODU4',
1425 'type': 'org-openroadm-interfaces:otnOdu',
1426 'supporting-port': 'CP1-CFP0-P1'}
1428 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1429 'rate': 'org-openroadm-otn-common-types:ODU2e',
1430 'monitoring-mode': 'monitored'}
1431 input_dict_3 = {'trib-port-number': 1}
1433 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1434 res['interface'][0])
1435 self.assertDictEqual(dict(input_dict_2,
1436 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1437 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1438 self.assertDictEqual(dict(input_dict_3,
1439 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1440 'parent-odu-allocation']),
1441 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1442 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1445 def test_080_check_ODU2E_connection_spdrb(self):
1446 response = test_utils.check_netconf_node_request(
1448 "odu-connection/XPDR2-NETWORK1-ODU2e-service1-10GE-x-XPDR2-NETWORK2-ODU2e-service1-10GE")
1449 self.assertEqual(response.status_code, requests.codes.ok)
1450 res = response.json()
1453 'XPDR2-NETWORK1-ODU2e-service1-10GE-x-XPDR2-NETWORK2-ODU2e-service1-10GE',
1454 'direction': 'bidirectional'
1457 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1458 res['odu-connection'][0])
1459 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU2e-service1-10GE'},
1460 res['odu-connection'][0]['destination'])
1461 self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU2e-service1-10GE'},
1462 res['odu-connection'][0]['source'])
1464 def test_081_check_otn_topo_links(self):
1465 response = test_utils.get_otn_topo_request()
1466 self.assertEqual(response.status_code, requests.codes.ok)
1467 res = response.json()
1468 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1469 self.assertEqual(nb_links, 8)
1470 for link in res['network'][0]['ietf-network-topology:link']:
1471 linkId = link['link-id']
1472 if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
1473 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1',
1474 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
1475 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2',)):
1477 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
1479 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
1481 def test_082_check_otn_topo_tp(self):
1482 self.test_051_check_otn_topo_tp()
1484 def test_083_delete_10GE_service(self):
1485 response = test_utils.service_delete_request("service1-10GE")
1486 self.assertEqual(response.status_code, requests.codes.ok)
1487 res = response.json()
1488 self.assertIn('Renderer service delete in progress',
1489 res['output']['configuration-response-common']['response-message'])
1490 time.sleep(self.WAITING)
1492 def test_084_check_service_list(self):
1493 response = test_utils.get_service_list_request("")
1494 self.assertEqual(response.status_code, requests.codes.ok)
1495 res = response.json()
1496 self.assertEqual(len(res['service-list']['services']), 4)
1499 def test_085_check_configuration_spdra(self):
1500 self.test_054_check_no_ODU2e_connection_spdra()
1501 self.test_055_check_no_interface_ODU2E_NETWORK_spdra()
1502 self.test_056_check_no_interface_ODU2E_CLIENT_spdra()
1503 self.test_057_check_no_interface_10GE_CLIENT_spdra()
1505 def test_086_check_no_ODU2e_connection_spdrb(self):
1506 response = test_utils.check_netconf_node_request("SPDR-SB1", "")
1507 self.assertEqual(response.status_code, requests.codes.ok)
1508 res = response.json()
1509 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1512 def test_087_check_no_interface_ODU2E_NETWORK1_spdrb(self):
1513 response = test_utils.check_netconf_node_request(
1514 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU2e-service1")
1515 self.assertEqual(response.status_code, requests.codes.conflict)
1517 def test_088_check_no_interface_ODU2E_NETWORK2_spdrb(self):
1518 response = test_utils.check_netconf_node_request(
1519 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU2e-service1")
1520 self.assertEqual(response.status_code, requests.codes.conflict)
1522 def test_089_check_otn_topo_links(self):
1523 response = test_utils.get_otn_topo_request()
1524 self.assertEqual(response.status_code, requests.codes.ok)
1525 res = response.json()
1526 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1527 self.assertEqual(nb_links, 8)
1528 for link in res['network'][0]['ietf-network-topology:link']:
1529 linkId = link['link-id']
1530 if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
1531 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1',
1532 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
1533 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2',)):
1535 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1537 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1539 def test_090_check_otn_topo_tp(self):
1540 self.test_059_check_otn_topo_tp()
1542 def test_091_delete_ODU4_service_AB(self):
1543 response = test_utils.service_delete_request("service-ODU4-AB")
1544 self.assertEqual(response.status_code, requests.codes.ok)
1545 res = response.json()
1546 self.assertIn('Renderer service delete in progress',
1547 res['output']['configuration-response-common']['response-message'])
1548 time.sleep(self.WAITING)
1550 def test_092_delete_ODU4_service_BC(self):
1551 response = test_utils.service_delete_request("service-ODU4-BC")
1552 self.assertEqual(response.status_code, requests.codes.ok)
1553 res = response.json()
1554 self.assertIn('Renderer service delete in progress',
1555 res['output']['configuration-response-common']['response-message'])
1556 time.sleep(self.WAITING)
1558 def test_093_check_global_config(self):
1559 self.test_061_check_service_list()
1560 self.test_062_check_no_interface_ODU4_spdra()
1561 self.test_063_check_no_interface_ODU4_spdrb()
1562 self.test_064_check_no_ODU4_connection_spdrb()
1563 self.test_065_check_no_interface_ODU4_spdrc()
1564 self.test_066_check_otn_topo_links()
1565 self.test_067_check_otn_topo_tp()
1567 def test_094_delete_OCH_OTU4_service_AB(self):
1568 response = test_utils.service_delete_request("service-OCH-OTU4-AB")
1569 self.assertEqual(response.status_code, requests.codes.ok)
1570 res = response.json()
1571 self.assertIn('Renderer service delete in progress',
1572 res['output']['configuration-response-common']['response-message'])
1573 time.sleep(self.WAITING)
1575 def test_095_delete_OCH_OTU4_service_BC(self):
1576 response = test_utils.service_delete_request("service-OCH-OTU4-BC")
1577 self.assertEqual(response.status_code, requests.codes.ok)
1578 res = response.json()
1579 self.assertIn('Renderer service delete in progress',
1580 res['output']['configuration-response-common']['response-message'])
1581 time.sleep(self.WAITING)
1583 def test_096_get_no_service(self):
1584 response = test_utils.get_service_list_request("")
1585 self.assertEqual(response.status_code, requests.codes.conflict)
1586 res = response.json()
1588 {"error-type": "application", "error-tag": "data-missing",
1589 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1590 res['errors']['error'])
1593 def test_097_check_no_interface_OTU4_spdra(self):
1594 response = test_utils.check_netconf_node_request(
1595 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
1596 self.assertEqual(response.status_code, requests.codes.conflict)
1598 def test_098_check_no_interface_OCH_spdra(self):
1599 response = test_utils.check_netconf_node_request(
1600 "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
1601 self.assertEqual(response.status_code, requests.codes.conflict)
1603 def test_099_getLinks_OtnTopology(self):
1604 response = test_utils.get_otn_topo_request()
1605 self.assertEqual(response.status_code, requests.codes.ok)
1606 res = response.json()
1607 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1609 def test_100_disconnect_xponders_from_roadm(self):
1610 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1611 response = test_utils.get_ordm_topo_request("")
1612 self.assertEqual(response.status_code, requests.codes.ok)
1613 res = response.json()
1614 links = res['network'][0]['ietf-network-topology:link']
1616 if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
1617 link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
1618 link_name = link["link-id"]
1619 response = test_utils.delete_request(url+link_name)
1620 self.assertEqual(response.status_code, requests.codes.ok)
1622 def test_101_check_openroadm_topology(self):
1623 response = test_utils.get_ordm_topo_request("")
1624 self.assertEqual(response.status_code, requests.codes.ok)
1625 res = response.json()
1626 links = res['network'][0]['ietf-network-topology:link']
1627 self.assertEqual(28, len(links), 'Topology should contain 28 links')
1629 def test_102_disconnect_spdrA(self):
1630 response = test_utils.unmount_device("SPDR-SA1")
1631 self.assertEqual(response.status_code, requests.codes.ok,
1632 test_utils.CODE_SHOULD_BE_200)
1634 def test_103_disconnect_spdrC(self):
1635 response = test_utils.unmount_device("SPDR-SC1")
1636 self.assertEqual(response.status_code, requests.codes.ok,
1637 test_utils.CODE_SHOULD_BE_200)
1639 def test_104_disconnect_spdrB(self):
1640 response = test_utils.unmount_device("SPDR-SB1")
1641 self.assertEqual(response.status_code, requests.codes.ok,
1642 test_utils.CODE_SHOULD_BE_200)
1644 def test_105_disconnect_roadmA(self):
1645 response = test_utils.unmount_device("ROADM-A1")
1646 self.assertEqual(response.status_code, requests.codes.ok,
1647 test_utils.CODE_SHOULD_BE_200)
1649 def test_106_disconnect_roadmB(self):
1650 response = test_utils.unmount_device("ROADM-B1")
1651 self.assertEqual(response.status_code, requests.codes.ok,
1652 test_utils.CODE_SHOULD_BE_200)
1654 def test_107_disconnect_roadmC(self):
1655 response = test_utils.unmount_device("ROADM-C1")
1656 self.assertEqual(response.status_code, requests.codes.ok,
1657 test_utils.CODE_SHOULD_BE_200)
1660 if __name__ == "__main__":
1661 unittest.main(verbosity=2)