From: Gilles Thouenon Date: Wed, 6 Oct 2021 10:05:07 +0000 (+0200) Subject: New functional tests with intermediate otn switch X-Git-Tag: 5.0.0~147 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F36%2F97836%2F18;p=transportpce.git New functional tests with intermediate otn switch Create a new functional test suite testing the end-to-end 10GE service management in a configuration with an intermediate otn-switch. It tests the creation of an end-to-end structured ODU4 over multiple OTU4, and the creation of an end-to-end 10GE service over multiple structured ODU4. JIRA: TRNSPRTPCE-542 Signed-off-by: Gilles Thouenon Change-Id: If33dd450b466b823733521151f364df8a45f1eba --- diff --git a/tests/transportpce_tests/2.2.1/test15_otn_end2end_with_intermediate_switch.py b/tests/transportpce_tests/2.2.1/test15_otn_end2end_with_intermediate_switch.py new file mode 100644 index 000000000..3cc1e32bf --- /dev/null +++ b/tests/transportpce_tests/2.2.1/test15_otn_end2end_with_intermediate_switch.py @@ -0,0 +1,1661 @@ +#!/usr/bin/env python + +############################################################################## +# Copyright (c) 2021 Orange, Inc. and others. All rights reserved. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +# pylint: disable=no-member +# pylint: disable=too-many-public-methods +# pylint: disable=too-many-lines + +import unittest +import time +import requests +# pylint: disable=wrong-import-order +import sys +sys.path.append('transportpce_tests/common/') +# pylint: disable=wrong-import-position +# pylint: disable=import-error +import test_utils # nopep8 + + +class TransportPCEtesting(unittest.TestCase): + + processes = None + WAITING = 20 # nominal value is 300 + NODE_VERSION = '2.2.1' + + cr_serv_sample_data = {"input": { + "sdnc-request-header": { + "request-id": "request-1", + "rpc-action": "service-create", + "request-system-id": "appname" + }, + "service-name": "service-OCH-OTU4-AB", + "common-id": "commonId", + "connection-type": "infrastructure", + "service-a-end": { + "service-rate": "100", + "node-id": "SPDR-SA1", + "service-format": "OTU", + "otu-service-rate": "org-openroadm-otn-common-types:OTU4", + "clli": "NodeSA", + "subrate-eth-sla": { + "subrate-eth-sla": { + "committed-info-rate": "100000", + "committed-burst-size": "64" + } + }, + "tx-direction": { + "port": { + "port-device-name": "SPDR-SA1-XPDR1", + "port-type": "fixed", + "port-name": "XPDR1-NETWORK1", + "port-rack": "000000.00", + "port-shelf": "Chassis#1" + }, + "lgx": { + "lgx-device-name": "Some lgx-device-name", + "lgx-port-name": "Some lgx-port-name", + "lgx-port-rack": "000000.00", + "lgx-port-shelf": "00" + } + }, + "rx-direction": { + "port": { + "port-device-name": "SPDR-SA1-XPDR1", + "port-type": "fixed", + "port-name": "XPDR1-NETWORK1", + "port-rack": "000000.00", + "port-shelf": "Chassis#1" + }, + "lgx": { + "lgx-device-name": "Some lgx-device-name", + "lgx-port-name": "Some lgx-port-name", + "lgx-port-rack": "000000.00", + "lgx-port-shelf": "00" + } + }, + "optic-type": "gray" + }, + "service-z-end": { + "service-rate": "100", + "node-id": "SPDR-SB1", + "service-format": "OTU", + "otu-service-rate": "org-openroadm-otn-common-types:OTU4", + "clli": "NodeSB", + "subrate-eth-sla": { + "subrate-eth-sla": { + "committed-info-rate": "100000", + "committed-burst-size": "64" + } + }, + "tx-direction": { + "port": { + "port-device-name": "SPDR-SB1-XPDR2", + "port-type": "fixed", + "port-name": "XPDR2-NETWORK1", + "port-rack": "000000.00", + "port-shelf": "Chassis#1" + }, + "lgx": { + "lgx-device-name": "Some lgx-device-name", + "lgx-port-name": "Some lgx-port-name", + "lgx-port-rack": "000000.00", + "lgx-port-shelf": "00" + } + }, + "rx-direction": { + "port": { + "port-device-name": "SPDR-SB1-XPDR2", + "port-type": "fixed", + "port-name": "XPDR2-NETWORK1", + "port-rack": "000000.00", + "port-shelf": "Chassis#1" + }, + "lgx": { + "lgx-device-name": "Some lgx-device-name", + "lgx-port-name": "Some lgx-port-name", + "lgx-port-rack": "000000.00", + "lgx-port-shelf": "00" + } + }, + "optic-type": "gray" + }, + "due-date": "2018-06-15T00:00:01Z", + "operator-contact": "pw1234" + } + } + + @classmethod + def setUpClass(cls): + cls.processes = test_utils.start_tpce() + cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION), + ('spdrb', cls.NODE_VERSION), + ('spdrc', cls.NODE_VERSION), + ('roadma', cls.NODE_VERSION), + ('roadmb', cls.NODE_VERSION), + ('roadmc', cls.NODE_VERSION)]) + + @classmethod + def tearDownClass(cls): + # pylint: disable=not-an-iterable + for process in cls.processes: + test_utils.shutdown_process(process) + print("all processes killed") + + def setUp(self): + time.sleep(2) + + def test_001_connect_spdrA(self): + response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION)) + self.assertEqual(response.status_code, + requests.codes.created, test_utils.CODE_SHOULD_BE_201) + + def test_002_connect_spdrB(self): + response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION)) + self.assertEqual(response.status_code, + requests.codes.created, test_utils.CODE_SHOULD_BE_201) + + def test_003_connect_spdrC(self): + response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION)) + self.assertEqual(response.status_code, + requests.codes.created, test_utils.CODE_SHOULD_BE_201) + + def test_004_connect_rdmA(self): + response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION)) + self.assertEqual(response.status_code, + requests.codes.created, test_utils.CODE_SHOULD_BE_201) + + def test_005_connect_rdmB(self): + response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION)) + self.assertEqual(response.status_code, + requests.codes.created, test_utils.CODE_SHOULD_BE_201) + + def test_006_connect_rdmC(self): + response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION)) + self.assertEqual(response.status_code, + requests.codes.created, test_utils.CODE_SHOULD_BE_201) + + def test_007_connect_sprdA_1_N1_to_roadmA_PP1(self): + response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1", + "ROADM-A1", "1", "SRG1-PP1-TXRX") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Xponder Roadm Link created successfully', + res["output"]["result"]) + time.sleep(2) + + def test_008_connect_roadmA_PP1_to_spdrA_1_N1(self): + response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1", + "ROADM-A1", "1", "SRG1-PP1-TXRX") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Roadm Xponder links created successfully', + res["output"]["result"]) + time.sleep(2) + + def test_009_connect_sprdC_1_N1_to_roadmC_PP1(self): + response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1", + "ROADM-C1", "1", "SRG1-PP1-TXRX") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Xponder Roadm Link created successfully', + res["output"]["result"]) + time.sleep(2) + + def test_010_connect_roadmC_PP1_to_spdrC_1_N1(self): + response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1", + "ROADM-C1", "1", "SRG1-PP1-TXRX") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Roadm Xponder links created successfully', + res["output"]["result"]) + time.sleep(2) + + def test_011_connect_sprdB_2_N1_to_roadmB_PP1(self): + response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1", + "ROADM-B1", "1", "SRG1-PP1-TXRX") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Xponder Roadm Link created successfully', + res["output"]["result"]) + time.sleep(2) + + def test_012_connect_roadmB_PP1_to_spdrB_2_N1(self): + response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1", + "ROADM-B1", "1", "SRG1-PP1-TXRX") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Roadm Xponder links created successfully', + res["output"]["result"]) + time.sleep(2) + + def test_013_connect_sprdB_2_N2_to_roadmB_PP2(self): + response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2", + "ROADM-B1", "1", "SRG1-PP2-TXRX") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Xponder Roadm Link created successfully', + res["output"]["result"]) + time.sleep(2) + + def test_014_connect_roadmB_PP2_to_spdrB_2_N2(self): + response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2", + "ROADM-B1", "1", "SRG1-PP2-TXRX") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Roadm Xponder links created successfully', + res["output"]["result"]) + time.sleep(2) + + def test_015_add_omsAttributes_ROADMA_ROADMB(self): + # Config ROADMA-ROADMC oms-attributes + data = {"span": { + "auto-spanloss": "true", + "spanloss-base": 11.4, + "spanloss-current": 12, + "engineered-spanloss": 12.2, + "link-concatenation": [{ + "SRLG-Id": 0, + "fiber-type": "smf", + "SRLG-length": 100000, + "pmd": 0.5}]}} + response = test_utils.add_oms_attr_request( + "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data) + self.assertEqual(response.status_code, requests.codes.created) + + def test_016_add_omsAttributes_ROADMB_ROADMA(self): + # Config ROADMC-ROADMA oms-attributes + data = {"span": { + "auto-spanloss": "true", + "spanloss-base": 11.4, + "spanloss-current": 12, + "engineered-spanloss": 12.2, + "link-concatenation": [{ + "SRLG-Id": 0, + "fiber-type": "smf", + "SRLG-length": 100000, + "pmd": 0.5}]}} + response = test_utils.add_oms_attr_request( + "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data) + self.assertEqual(response.status_code, requests.codes.created) + + def test_017_add_omsAttributes_ROADMB_ROADMC(self): + # Config ROADMA-ROADMC oms-attributes + data = {"span": { + "auto-spanloss": "true", + "spanloss-base": 11.4, + "spanloss-current": 12, + "engineered-spanloss": 12.2, + "link-concatenation": [{ + "SRLG-Id": 0, + "fiber-type": "smf", + "SRLG-length": 100000, + "pmd": 0.5}]}} + response = test_utils.add_oms_attr_request( + "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data) + self.assertEqual(response.status_code, requests.codes.created) + + def test_018_add_omsAttributes_ROADMC_ROADMB(self): + # Config ROADMC-ROADMA oms-attributes + data = {"span": { + "auto-spanloss": "true", + "spanloss-base": 11.4, + "spanloss-current": 12, + "engineered-spanloss": 12.2, + "link-concatenation": [{ + "SRLG-Id": 0, + "fiber-type": "smf", + "SRLG-length": 100000, + "pmd": 0.5}]}} + response = test_utils.add_oms_attr_request( + "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data) + self.assertEqual(response.status_code, requests.codes.created) + + def test_019_create_OTS_ROADMA_DEG1(self): + response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX") + time.sleep(10) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1', + res["output"]["result"]) + + def test_020_create_OTS_ROADMB_DEG1(self): + response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX") + time.sleep(10) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1', + res["output"]["result"]) + + def test_021_create_OTS_ROADMB_DEG2(self): + response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX") + time.sleep(10) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1', + res["output"]["result"]) + + def test_022_create_OTS_ROADMC_DEG2(self): + response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX") + time.sleep(10) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1', + res["output"]["result"]) + + def test_023_calculate_span_loss_base_all(self): + url = "{}/operations/transportpce-olm:calculate-spanloss-base" + data = { + "input": { + "src-type": "all" + } + } + response = test_utils.post_request(url, data) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Success', + res["output"]["result"]) + self.assertIn({ + "spanloss": "25.7", + "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX" + }, res["output"]["spans"]) + self.assertIn({ + "spanloss": "17.6", + "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX" + }, res["output"]["spans"]) + self.assertIn({ + "spanloss": "23.6", + "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX" + }, res["output"]["spans"]) + self.assertIn({ + "spanloss": "23.6", + "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX" + }, res["output"]["spans"]) + self.assertIn({ + "spanloss": "25.7", + "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX" + }, res["output"]["spans"]) + self.assertIn({ + "spanloss": "17.6", + "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX" + }, res["output"]["spans"]) + time.sleep(5) + + def test_024_check_otn_topology(self): + response = test_utils.get_otn_topo_request() + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + nbNode = len(res['network'][0]['node']) + self.assertEqual(nbNode, 9, 'There should be 9 nodes') + self.assertNotIn('ietf-network-topology:link', res['network'][0], + 'otn-topology should have no link') + +# test service-create for OCH-OTU4 service from spdrA to spdrB + def test_025_create_OCH_OTU4_service_AB(self): + response = test_utils.service_create_request(self.cr_serv_sample_data) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('PCE calculation in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_026_get_OCH_OTU4_service_AB(self): + response = test_utils.get_service_list_request( + "services/service-OCH-OTU4-AB") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertEqual( + res['services'][0]['administrative-state'], 'inService') + self.assertEqual( + res['services'][0]['service-name'], 'service-OCH-OTU4-AB') + self.assertEqual( + res['services'][0]['connection-type'], 'infrastructure') + self.assertEqual( + res['services'][0]['lifecycle-state'], 'planned') + time.sleep(2) + + def test_027_check_otn_topo_otu4_links(self): + response = test_utils.get_otn_topo_request() + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + nb_links = len(res['network'][0]['ietf-network-topology:link']) + self.assertEqual(nb_links, 2) + listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1', + 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'] + for link in res['network'][0]['ietf-network-topology:link']: + self.assertIn(link['link-id'], listLinkId) + self.assertEqual( + link['transportpce-topology:otn-link-type'], 'OTU4') + self.assertEqual( + link['org-openroadm-common-network:link-type'], 'OTN-LINK') + self.assertEqual( + link['org-openroadm-otn-network-topology:available-bandwidth'], 100000) + self.assertEqual( + link['org-openroadm-otn-network-topology:used-bandwidth'], 0) + self.assertIn( + link['org-openroadm-common-network:opposite-link'], listLinkId) + +# test service-create for OCH-OTU4 service from spdrB to spdrC + def test_028_create_OCH_OTU4_service_BC(self): + # pylint: disable=line-too-long + self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC" + self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1" + self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB" + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2" + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2" + self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1" + self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC" + self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1" + self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1" + self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + + response = test_utils.service_create_request(self.cr_serv_sample_data) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('PCE calculation in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_029_get_OCH_OTU4_service_BC(self): + response = test_utils.get_service_list_request( + "services/service-OCH-OTU4-BC") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertEqual( + res['services'][0]['administrative-state'], 'inService') + self.assertEqual( + res['services'][0]['service-name'], 'service-OCH-OTU4-BC') + self.assertEqual( + res['services'][0]['connection-type'], 'infrastructure') + self.assertEqual( + res['services'][0]['lifecycle-state'], 'planned') + time.sleep(2) + + def test_030_check_otn_topo_otu4_links(self): + response = test_utils.get_otn_topo_request() + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + nb_links = len(res['network'][0]['ietf-network-topology:link']) + self.assertEqual(nb_links, 4) + listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1', + 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1', + 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR1-XPDR1-NETWORK1', + 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2'] + for link in res['network'][0]['ietf-network-topology:link']: + self.assertIn(link['link-id'], listLinkId) + self.assertEqual( + link['transportpce-topology:otn-link-type'], 'OTU4') + self.assertEqual( + link['org-openroadm-common-network:link-type'], 'OTN-LINK') + self.assertEqual( + link['org-openroadm-otn-network-topology:available-bandwidth'], 100000) + self.assertEqual( + link['org-openroadm-otn-network-topology:used-bandwidth'], 0) + self.assertIn( + link['org-openroadm-common-network:opposite-link'], listLinkId) + +# test service-create for ODU4 service from spdrA to spdrC via spdrB + def test_031_create_ODU4_service(self): + # pylint: disable=line-too-long + self.cr_serv_sample_data["input"]["service-name"] = "service-ODU4-ABC" + self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1" + self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA" + self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU" + del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] + self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4" + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1" + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1" + self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC" + self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU" + del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] + self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4" + self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1" + self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1" + self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + + response = test_utils.service_create_request(self.cr_serv_sample_data) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('PCE calculation in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_032_get_ODU4_service_ABC(self): + response = test_utils.get_service_list_request( + "services/service-ODU4-ABC") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertEqual( + res['services'][0]['administrative-state'], 'inService') + self.assertEqual( + res['services'][0]['service-name'], 'service-ODU4-ABC') + self.assertEqual( + res['services'][0]['connection-type'], 'infrastructure') + self.assertEqual( + res['services'][0]['lifecycle-state'], 'planned') + time.sleep(2) + + def test_033_check_interface_ODU4_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-CFP0-P1'} + # SAPI/DAPI are added in the Otu4 renderer + input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP', + 'rate': 'org-openroadm-otn-common-types:ODU4', + 'expected-dapi': 'H/OelLynehI=', + 'expected-sapi': 'AMf1n5hK6Xkk', + 'tx-dapi': 'AMf1n5hK6Xkk', + 'tx-sapi': 'H/OelLynehI='} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], + **input_dict_2), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + ) + self.assertDictEqual( + {'payload-type': '21', 'exp-payload-type': '21'}, + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + response2 = test_utils.check_netconf_node_request( + "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi']) + self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi']) + + def test_034_check_interface_ODU4_spdrc(self): + response = test_utils.check_netconf_node_request( + "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-CFP0-P1'} + # SAPI/DAPI are added in the Otu4 renderer + input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP', + 'rate': 'org-openroadm-otn-common-types:ODU4', + 'expected-dapi': 'AMf1n5hK6Xkk', + 'expected-sapi': 'H/OelLynehI=', + 'tx-dapi': 'H/OelLynehI=', + 'tx-sapi': 'AMf1n5hK6Xkk'} + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], + **input_dict_2), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + ) + self.assertDictEqual( + {'payload-type': '21', 'exp-payload-type': '21'}, + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + response2 = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi']) + self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi']) + + def test_035_check_interface_ODU4_NETWORK1_spdrb(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP5-CFP', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP5-CFP-P1'} + input_dict_2 = { + 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP', + 'rate': 'org-openroadm-otn-common-types:ODU4', + 'monitoring-mode': 'monitored'} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertNotIn('opu', + dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])) + + def test_036_check_interface_ODU4_NETWORK2_spdrb(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP6-CFP', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP6-CFP-P1'} + input_dict_2 = { + 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP', + 'rate': 'org-openroadm-otn-common-types:ODU4', + 'monitoring-mode': 'monitored'} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertNotIn('opu', + dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])) + + def test_037_check_ODU4_connection_spdrb(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", + "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = { + 'connection-name': + 'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4', + 'direction': 'bidirectional' + } + + self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]), + res['odu-connection'][0]) + self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'}, + res['odu-connection'][0]['destination']) + self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'}, + res['odu-connection'][0]['source']) + + def test_038_check_otn_topo_links(self): + response = test_utils.get_otn_topo_request() + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + nb_links = len(res['network'][0]['ietf-network-topology:link']) + self.assertEqual(nb_links, 6) + for link in res['network'][0]['ietf-network-topology:link']: + if 'OTU4' in link['link-id']: + self.assertEqual( + link['org-openroadm-otn-network-topology:available-bandwidth'], 0) + self.assertEqual( + link['org-openroadm-otn-network-topology:used-bandwidth'], 100000) + elif 'ODTU4' in link['link-id']: + self.assertEqual( + link['org-openroadm-otn-network-topology:available-bandwidth'], 100000) + self.assertEqual( + link['org-openroadm-otn-network-topology:used-bandwidth'], 0) + self.assertEqual( + link['transportpce-topology:otn-link-type'], 'ODTU4') + self.assertEqual( + link['org-openroadm-common-network:link-type'], 'OTN-LINK') + self.assertIn(link['org-openroadm-common-network:opposite-link'], + ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1', + 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']) + else: + self.fail("this link should not exist") + + def test_039_check_otn_topo_tp(self): + response = test_utils.get_otn_topo_request() + res = response.json() + for node in res['network'][0]['node']: + if node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1': + tpList = node['ietf-network-topology:termination-point'] + for tp in tpList: + if tp['tp-id'] == 'XPDR1-NETWORK1': + xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes'] + self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80) + self.assertEqual( + len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80) + self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'], + 'org-openroadm-otn-common-types:ODTU4.ts-Allocated') + +# test service-create for 10GE service from spdr to spdr + def test_040_create_10GE_service(self): + self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE" + self.cr_serv_sample_data["input"]["connection-type"] = "service" + self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10" + self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet" + del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1" + self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10" + self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet" + del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] + self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + response = test_utils.service_create_request(self.cr_serv_sample_data) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('PCE calculation in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_041_get_10GE_service1(self): + response = test_utils.get_service_list_request( + "services/service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertEqual( + res['services'][0]['administrative-state'], 'inService') + self.assertEqual( + res['services'][0]['service-name'], 'service1-10GE') + self.assertEqual( + res['services'][0]['connection-type'], 'service') + self.assertEqual( + res['services'][0]['lifecycle-state'], 'planned') + time.sleep(2) + + def test_042_check_interface_10GE_CLIENT_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-SFP4', + 'type': 'org-openroadm-interfaces:ethernetCsmacd', + 'supporting-port': 'CP1-SFP4-P1' + } + self.assertDictEqual(dict(input_dict, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual( + {'speed': 10000}, + res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']) + + def test_043_check_interface_ODU2E_CLIENT_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-SFP4', + 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-SFP4-P1'} + input_dict_2 = { + 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP', + 'rate': 'org-openroadm-otn-common-types:ODU2e', + 'monitoring-mode': 'terminated', + 'expected-dapi': 'B68VWipZAU0=', + 'expected-sapi': 'BcwI5xz79t8=', + 'tx-dapi': 'BcwI5xz79t8=', + 'tx-sapi': 'B68VWipZAU0='} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual( + {'payload-type': '03', 'exp-payload-type': '03'}, + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + response2 = test_utils.check_netconf_node_request( + "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi']) + self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi']) + + def test_044_check_interface_ODU2E_NETWORK_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'supporting-interface': 'XPDR1-NETWORK1-ODU4', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-CFP0-P1'} + input_dict_2 = { + 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP', + 'rate': 'org-openroadm-otn-common-types:ODU2e', + 'monitoring-mode': 'monitored'} + input_dict_3 = {'trib-port-number': 1} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual(dict(input_dict_3, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][ + 'parent-odu-allocation']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']) + self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'] + ['trib-slots']) + + def test_045_check_ODU2E_connection_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", + "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = { + 'connection-name': + 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE', + 'direction': 'bidirectional' + } + + self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]), + res['odu-connection'][0]) + self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'}, + res['odu-connection'][0]['destination']) + self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'}, + res['odu-connection'][0]['source']) + + def test_046_check_interface_10GE_CLIENT_spdrc(self): + response = test_utils.check_netconf_node_request( + "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-SFP4', + 'type': 'org-openroadm-interfaces:ethernetCsmacd', + 'supporting-port': 'CP1-SFP4-P1' + } + self.assertDictEqual(dict(input_dict, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual( + {'speed': 10000}, + res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']) + + def test_047_check_interface_ODU2E_CLIENT_spdrc(self): + response = test_utils.check_netconf_node_request( + "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-SFP4', + 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-SFP4-P1'} + input_dict_2 = { + 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP', + 'rate': 'org-openroadm-otn-common-types:ODU2e', + 'monitoring-mode': 'terminated', + 'expected-dapi': 'BcwI5xz79t8=', + 'expected-sapi': 'B68VWipZAU0=', + 'tx-dapi': 'B68VWipZAU0=', + 'tx-sapi': 'BcwI5xz79t8='} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual( + {'payload-type': '03', 'exp-payload-type': '03'}, + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + response2 = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi']) + self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi']) + + def test_048_check_interface_ODU2E_NETWORK_spdrc(self): + response = test_utils.check_netconf_node_request( + "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'supporting-interface': 'XPDR1-NETWORK1-ODU4', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-CFP0-P1'} + input_dict_2 = { + 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP', + 'rate': 'org-openroadm-otn-common-types:ODU2e', + 'monitoring-mode': 'monitored'} + input_dict_3 = {'trib-port-number': 1} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual(dict(input_dict_3, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][ + 'parent-odu-allocation']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][ + 'parent-odu-allocation']) + self.assertIn(1, + res['interface'][0][ + 'org-openroadm-otn-odu-interfaces:odu'][ + 'parent-odu-allocation']['trib-slots']) + + def test_049_check_ODU2E_connection_spdrc(self): + response = test_utils.check_netconf_node_request( + "SPDR-SC1", + "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = { + 'connection-name': + 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE', + 'direction': 'bidirectional' + } + + self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]), + res['odu-connection'][0]) + self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'}, + res['odu-connection'][0]['destination']) + self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'}, + res['odu-connection'][0]['source']) + + def test_050_check_otn_topo_links(self): + response = test_utils.get_otn_topo_request() + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + nb_links = len(res['network'][0]['ietf-network-topology:link']) + self.assertEqual(nb_links, 6) + for link in res['network'][0]['ietf-network-topology:link']: + linkId = link['link-id'] + if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1', + 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')): + self.assertEqual( + link['org-openroadm-otn-network-topology:available-bandwidth'], 90000) + self.assertEqual( + link['org-openroadm-otn-network-topology:used-bandwidth'], 10000) + + def test_051_check_otn_topo_tp(self): + response = test_utils.get_otn_topo_request() + res = response.json() + for node in res['network'][0]['node']: + if node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1': + tpList = node['ietf-network-topology:termination-point'] + for tp in tpList: + if tp['tp-id'] == 'XPDR1-NETWORK1': + xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes'] + self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72) + tsPoolList = list(range(1, 9)) + self.assertNotIn( + tsPoolList, xpdrTpPortConAt['ts-pool']) + self.assertEqual( + len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79) + self.assertNotIn( + 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']) + + def test_052_delete_10GE_service(self): + response = test_utils.service_delete_request("service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Renderer service delete in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_053_check_service_list(self): + response = test_utils.get_service_list_request("") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertEqual(len(res['service-list']['services']), 3) + time.sleep(2) + + def test_054_check_no_ODU2e_connection_spdra(self): + response = test_utils.check_netconf_node_request("SPDR-SA1", "") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertNotIn(['odu-connection'][0], res['org-openroadm-device']) + time.sleep(1) + + def test_055_check_no_interface_ODU2E_NETWORK_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1") + self.assertEqual(response.status_code, requests.codes.conflict) + + def test_056_check_no_interface_ODU2E_CLIENT_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1") + self.assertEqual(response.status_code, requests.codes.conflict) + + def test_057_check_no_interface_10GE_CLIENT_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G") + self.assertEqual(response.status_code, requests.codes.conflict) + + def test_058_check_otn_topo_links(self): + response = test_utils.get_otn_topo_request() + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + nb_links = len(res['network'][0]['ietf-network-topology:link']) + self.assertEqual(nb_links, 6) + for link in res['network'][0]['ietf-network-topology:link']: + linkId = link['link-id'] + if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1', + 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')): + self.assertEqual( + link['org-openroadm-otn-network-topology:available-bandwidth'], 100000) + self.assertEqual( + link['org-openroadm-otn-network-topology:used-bandwidth'], 0) + + def test_059_check_otn_topo_tp(self): + response = test_utils.get_otn_topo_request() + res = response.json() + for node in res['network'][0]['node']: + if (node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1'): + tpList = node['ietf-network-topology:termination-point'] + for tp in tpList: + if tp['tp-id'] == 'XPDR1-NETWORK1': + xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes'] + self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80) + self.assertEqual( + len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80) + + def test_060_delete_ODU4_service(self): + response = test_utils.service_delete_request("service-ODU4-ABC") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Renderer service delete in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_061_check_service_list(self): + response = test_utils.get_service_list_request("") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertEqual(len(res['service-list']['services']), 2) + time.sleep(2) + + def test_062_check_no_interface_ODU4_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.conflict) + + def test_063_check_no_interface_ODU4_spdrb(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.conflict) + response = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4") + self.assertEqual(response.status_code, requests.codes.conflict) + + def test_064_check_no_ODU4_connection_spdrb(self): + response = test_utils.check_netconf_node_request("SPDR-SB1", "") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertNotIn(['odu-connection'][0], res['org-openroadm-device']) + time.sleep(1) + + def test_065_check_no_interface_ODU4_spdrc(self): + response = test_utils.check_netconf_node_request( + "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.conflict) + + def test_066_check_otn_topo_links(self): + self.test_030_check_otn_topo_otu4_links() + + def test_067_check_otn_topo_tp(self): + response = test_utils.get_otn_topo_request() + res = response.json() + for node in res['network'][0]['node']: + if node['node-id'] == 'SPDR-SA1-XPDR1' or node['node-id'] == 'SPDR-SC1-XPDR1': + tpList = node['ietf-network-topology:termination-point'] + for tp in tpList: + if tp['tp-id'] == 'XPDR1-NETWORK1': + xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes'] + self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt)) + self.assertNotIn( + 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt)) + +# test service-create for ODU4 service from spdrA to spdrB + def test_068_create_ODU4_service_AB(self): + # pylint: disable=line-too-long + self.cr_serv_sample_data["input"]["service-name"] = "service-ODU4-AB" + self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure" + self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100" + self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU" + self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4" + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1" + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SB1" + self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSB" + self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100" + self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU" + self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4" + self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2" + self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1" + self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2" + self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1" + + response = test_utils.service_create_request(self.cr_serv_sample_data) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('PCE calculation in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_069_get_ODU4_service_AB(self): + response = test_utils.get_service_list_request( + "services/service-ODU4-AB") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertEqual( + res['services'][0]['administrative-state'], 'inService') + self.assertEqual( + res['services'][0]['service-name'], 'service-ODU4-AB') + self.assertEqual( + res['services'][0]['connection-type'], 'infrastructure') + self.assertEqual( + res['services'][0]['lifecycle-state'], 'planned') + time.sleep(2) + + def test_070_check_interface_ODU4_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-CFP0-P1'} + # SAPI/DAPI are added in the Otu4 renderer + input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP', + 'rate': 'org-openroadm-otn-common-types:ODU4', + 'expected-dapi': 'H/OelLynehI=', + 'expected-sapi': 'X+8cRNi+HbE=', + 'tx-dapi': 'X+8cRNi+HbE=', + 'tx-sapi': 'H/OelLynehI='} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], + **input_dict_2), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + ) + self.assertDictEqual( + {'payload-type': '21', 'exp-payload-type': '21'}, + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + response2 = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi']) + self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi']) + + def test_071_check_interface_ODU4_spdrb_N1(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-CFP0-P1'} + # SAPI/DAPI are added in the Otu4 renderer + input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP', + 'rate': 'org-openroadm-otn-common-types:ODU4', + 'expected-dapi': 'X+8cRNi+HbE=', + 'expected-sapi': 'H/OelLynehI=', + 'tx-dapi': 'H/OelLynehI=', + 'tx-sapi': 'X+8cRNi+HbE='} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], + **input_dict_2), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + ) + self.assertDictEqual( + {'payload-type': '21', 'exp-payload-type': '21'}, + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + response2 = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi']) + self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi']) + +# test service-create for ODU4 service from spdrB to spdrC + def test_072_create_ODU4_service_BC(self): + # pylint: disable=line-too-long + self.cr_serv_sample_data["input"]["service-name"] = "service-ODU4-BC" + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2" + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2" + self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1" + self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC" + self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1" + self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR1" + self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-NETWORK1" + + response = test_utils.service_create_request(self.cr_serv_sample_data) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('PCE calculation in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_073_get_ODU4_service_AB(self): + response = test_utils.get_service_list_request( + "services/service-ODU4-BC") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertEqual( + res['services'][0]['administrative-state'], 'inService') + self.assertEqual( + res['services'][0]['service-name'], 'service-ODU4-BC') + self.assertEqual( + res['services'][0]['connection-type'], 'infrastructure') + self.assertEqual( + res['services'][0]['lifecycle-state'], 'planned') + time.sleep(2) + + def test_074_check_interface_ODU4_spdrb_N2(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-CFP0-P1'} + # SAPI/DAPI are added in the Otu4 renderer + input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP', + 'rate': 'org-openroadm-otn-common-types:ODU4', + 'expected-dapi': 'X+8cRNi+HbI=', + 'expected-sapi': 'AMf1n5hK6Xkk', + 'tx-dapi': 'AMf1n5hK6Xkk', + 'tx-sapi': 'X+8cRNi+HbI='} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], + **input_dict_2), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + ) + self.assertDictEqual( + {'payload-type': '21', 'exp-payload-type': '21'}, + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + response2 = test_utils.check_netconf_node_request( + "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi']) + self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi']) + + def test_075_check_interface_ODU4_spdrc(self): + response = test_utils.check_netconf_node_request( + "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-CFP0-P1'} + # SAPI/DAPI are added in the Otu4 renderer + input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP', + 'rate': 'org-openroadm-otn-common-types:ODU4', + 'expected-dapi': 'AMf1n5hK6Xkk', + 'expected-sapi': 'X+8cRNi+HbI=', + 'tx-dapi': 'X+8cRNi+HbI=', + 'tx-sapi': 'AMf1n5hK6Xkk'} + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], + **input_dict_2), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + ) + self.assertDictEqual( + {'payload-type': '21', 'exp-payload-type': '21'}, + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + response2 = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4") + self.assertEqual(response.status_code, requests.codes.ok) + res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi']) + self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi']) + self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi']) + +# test service-create for 10GE service from spdr to spdr + def test_076_create_10GE_service_ABC(self): + # pylint: disable=line-too-long + self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE" + self.cr_serv_sample_data["input"]["connection-type"] = "service" + self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1" + self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA" + self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10" + self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet" + del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1" + self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR1" + self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1" + self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10" + self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet" + del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] + self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1" + self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1" + response = test_utils.service_create_request(self.cr_serv_sample_data) + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('PCE calculation in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_077_check_configuration_spdra_spdrc(self): + self.test_041_get_10GE_service1() + self.test_042_check_interface_10GE_CLIENT_spdra() + self.test_043_check_interface_ODU2E_CLIENT_spdra() + self.test_044_check_interface_ODU2E_NETWORK_spdra() + self.test_045_check_ODU2E_connection_spdra() + self.test_046_check_interface_10GE_CLIENT_spdrc() + self.test_047_check_interface_ODU2E_CLIENT_spdrc() + self.test_049_check_ODU2E_connection_spdrc() + + def test_078_check_interface_ODU2E_NETWORK1_spdrb(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU2e-service1-10GE', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'supporting-interface': 'XPDR2-NETWORK1-ODU4', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-CFP0-P1'} + input_dict_2 = { + 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP', + 'rate': 'org-openroadm-otn-common-types:ODU2e', + 'monitoring-mode': 'monitored'} + input_dict_3 = {'trib-port-number': 1} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual(dict(input_dict_3, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][ + 'parent-odu-allocation']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']) + self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'] + ['trib-slots']) + + def test_079_check_interface_ODU2E_NETWORK2_spdrb(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = {'name': 'XPDR2-NETWORK2-ODU2e-service1-10GE', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'supporting-interface': 'XPDR2-NETWORK2-ODU4', + 'type': 'org-openroadm-interfaces:otnOdu', + 'supporting-port': 'CP1-CFP0-P1'} + input_dict_2 = { + 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP', + 'rate': 'org-openroadm-otn-common-types:ODU2e', + 'monitoring-mode': 'monitored'} + input_dict_3 = {'trib-port-number': 1} + + self.assertDictEqual(dict(input_dict_1, **res['interface'][0]), + res['interface'][0]) + self.assertDictEqual(dict(input_dict_2, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual(dict(input_dict_3, + **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][ + 'parent-odu-allocation']), + res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']) + self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'] + ['trib-slots']) + + def test_080_check_ODU2E_connection_spdrb(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", + "odu-connection/XPDR2-NETWORK1-ODU2e-service1-10GE-x-XPDR2-NETWORK2-ODU2e-service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + input_dict_1 = { + 'connection-name': + 'XPDR2-NETWORK1-ODU2e-service1-10GE-x-XPDR2-NETWORK2-ODU2e-service1-10GE', + 'direction': 'bidirectional' + } + + self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]), + res['odu-connection'][0]) + self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU2e-service1-10GE'}, + res['odu-connection'][0]['destination']) + self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU2e-service1-10GE'}, + res['odu-connection'][0]['source']) + + def test_081_check_otn_topo_links(self): + response = test_utils.get_otn_topo_request() + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + nb_links = len(res['network'][0]['ietf-network-topology:link']) + self.assertEqual(nb_links, 8) + for link in res['network'][0]['ietf-network-topology:link']: + linkId = link['link-id'] + if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1', + 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1', + 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR1-XPDR1-NETWORK1', + 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2',)): + self.assertEqual( + link['org-openroadm-otn-network-topology:available-bandwidth'], 90000) + self.assertEqual( + link['org-openroadm-otn-network-topology:used-bandwidth'], 10000) + + def test_082_check_otn_topo_tp(self): + self.test_051_check_otn_topo_tp() + + def test_083_delete_10GE_service(self): + response = test_utils.service_delete_request("service1-10GE") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Renderer service delete in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_084_check_service_list(self): + response = test_utils.get_service_list_request("") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertEqual(len(res['service-list']['services']), 4) + time.sleep(2) + + def test_085_check_configuration_spdra(self): + self.test_054_check_no_ODU2e_connection_spdra() + self.test_055_check_no_interface_ODU2E_NETWORK_spdra() + self.test_056_check_no_interface_ODU2E_CLIENT_spdra() + self.test_057_check_no_interface_10GE_CLIENT_spdra() + + def test_086_check_no_ODU2e_connection_spdrb(self): + response = test_utils.check_netconf_node_request("SPDR-SB1", "") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertNotIn(['odu-connection'][0], res['org-openroadm-device']) + time.sleep(1) + + def test_087_check_no_interface_ODU2E_NETWORK1_spdrb(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU2e-service1") + self.assertEqual(response.status_code, requests.codes.conflict) + + def test_088_check_no_interface_ODU2E_NETWORK2_spdrb(self): + response = test_utils.check_netconf_node_request( + "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU2e-service1") + self.assertEqual(response.status_code, requests.codes.conflict) + + def test_089_check_otn_topo_links(self): + response = test_utils.get_otn_topo_request() + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + nb_links = len(res['network'][0]['ietf-network-topology:link']) + self.assertEqual(nb_links, 8) + for link in res['network'][0]['ietf-network-topology:link']: + linkId = link['link-id'] + if (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1', + 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1', + 'ODTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR1-XPDR1-NETWORK1', + 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2',)): + self.assertEqual( + link['org-openroadm-otn-network-topology:available-bandwidth'], 100000) + self.assertEqual( + link['org-openroadm-otn-network-topology:used-bandwidth'], 0) + + def test_090_check_otn_topo_tp(self): + self.test_059_check_otn_topo_tp() + + def test_091_delete_ODU4_service_AB(self): + response = test_utils.service_delete_request("service-ODU4-AB") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Renderer service delete in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_092_delete_ODU4_service_BC(self): + response = test_utils.service_delete_request("service-ODU4-BC") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Renderer service delete in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_093_check_global_config(self): + self.test_061_check_service_list() + self.test_062_check_no_interface_ODU4_spdra() + self.test_063_check_no_interface_ODU4_spdrb() + self.test_064_check_no_ODU4_connection_spdrb() + self.test_065_check_no_interface_ODU4_spdrc() + self.test_066_check_otn_topo_links() + self.test_067_check_otn_topo_tp() + + def test_094_delete_OCH_OTU4_service_AB(self): + response = test_utils.service_delete_request("service-OCH-OTU4-AB") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Renderer service delete in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_095_delete_OCH_OTU4_service_BC(self): + response = test_utils.service_delete_request("service-OCH-OTU4-BC") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertIn('Renderer service delete in progress', + res['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) + + def test_096_get_no_service(self): + response = test_utils.get_service_list_request("") + self.assertEqual(response.status_code, requests.codes.conflict) + res = response.json() + self.assertIn( + {"error-type": "application", "error-tag": "data-missing", + "error-message": "Request could not be completed because the relevant data model content does not exist"}, + res['errors']['error']) + time.sleep(1) + + def test_097_check_no_interface_OTU4_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU") + self.assertEqual(response.status_code, requests.codes.conflict) + + def test_098_check_no_interface_OCH_spdra(self): + response = test_utils.check_netconf_node_request( + "SPDR-SA1", "interface/XPDR1-NETWORK1-1") + self.assertEqual(response.status_code, requests.codes.conflict) + + def test_099_getLinks_OtnTopology(self): + response = test_utils.get_otn_topo_request() + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + self.assertNotIn('ietf-network-topology:link', res['network'][0]) + + def test_100_disconnect_xponders_from_roadm(self): + url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/" + response = test_utils.get_ordm_topo_request("") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + links = res['network'][0]['ietf-network-topology:link'] + for link in links: + if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or + link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"): + link_name = link["link-id"] + response = test_utils.delete_request(url+link_name) + self.assertEqual(response.status_code, requests.codes.ok) + + def test_101_check_openroadm_topology(self): + response = test_utils.get_ordm_topo_request("") + self.assertEqual(response.status_code, requests.codes.ok) + res = response.json() + links = res['network'][0]['ietf-network-topology:link'] + self.assertEqual(28, len(links), 'Topology should contain 28 links') + + def test_102_disconnect_spdrA(self): + response = test_utils.unmount_device("SPDR-SA1") + self.assertEqual(response.status_code, requests.codes.ok, + test_utils.CODE_SHOULD_BE_200) + + def test_103_disconnect_spdrC(self): + response = test_utils.unmount_device("SPDR-SC1") + self.assertEqual(response.status_code, requests.codes.ok, + test_utils.CODE_SHOULD_BE_200) + + def test_104_disconnect_spdrB(self): + response = test_utils.unmount_device("SPDR-SB1") + self.assertEqual(response.status_code, requests.codes.ok, + test_utils.CODE_SHOULD_BE_200) + + def test_105_disconnect_roadmA(self): + response = test_utils.unmount_device("ROADM-A1") + self.assertEqual(response.status_code, requests.codes.ok, + test_utils.CODE_SHOULD_BE_200) + + def test_106_disconnect_roadmB(self): + response = test_utils.unmount_device("ROADM-B1") + self.assertEqual(response.status_code, requests.codes.ok, + test_utils.CODE_SHOULD_BE_200) + + def test_107_disconnect_roadmC(self): + response = test_utils.unmount_device("ROADM-C1") + self.assertEqual(response.status_code, requests.codes.ok, + test_utils.CODE_SHOULD_BE_200) + + +if __name__ == "__main__": + unittest.main(verbosity=2)