sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils_rfc8040.start_tpce()
- cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
- ('spdrb', cls.NODE_VERSION),
- ('spdrc', cls.NODE_VERSION),
- ('roadma', cls.NODE_VERSION),
- ('roadmb', cls.NODE_VERSION),
- ('roadmc', cls.NODE_VERSION)])
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
+ ('spdrb', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmb', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(2)
def test_001_connect_spdrA(self):
- response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_002_connect_spdrB(self):
- response = test_utils_rfc8040.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
+ response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_003_connect_spdrC(self):
- response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_004_connect_rdmA(self):
- response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_005_connect_rdmB(self):
- response = test_utils_rfc8040.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
+ response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_006_connect_rdmC(self):
- response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_007_connect_sprdA_2_N1_to_roadmA_PP3(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
time.sleep(2)
def test_008_connect_roadmA_PP3_to_spdrA_2_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
time.sleep(2)
def test_009_connect_sprdC_2_N1_to_roadmC_PP3(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '2', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
time.sleep(2)
def test_010_connect_roadmC_PP3_to_spdrC_2_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '2', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
time.sleep(2)
def test_011_connect_sprdB_2_N1_to_roadmB_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '1',
'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_012_connect_roadmB_PP1_to_spdrB_2_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '1',
'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_013_connect_sprdB_2_N2_to_roadmB_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '2',
'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
time.sleep(2)
def test_014_connect_roadmB_PP2_to_spdrB_2_N2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '2',
'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request("ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX",
- data)
+ response = test_utils.add_oms_attr_request("ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX",
+ data)
self.assertEqual(response.status_code, requests.codes.created)
def test_016_add_omsAttributes_ROADMB_ROADMA(self):
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request("ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX",
- data)
+ response = test_utils.add_oms_attr_request("ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX",
+ data)
self.assertEqual(response.status_code, requests.codes.created)
def test_017_add_omsAttributes_ROADMB_ROADMC(self):
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request("ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX",
- data)
+ response = test_utils.add_oms_attr_request("ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX",
+ data)
self.assertEqual(response.status_code, requests.codes.created)
def test_018_add_omsAttributes_ROADMC_ROADMB(self):
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request("ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX",
- data)
+ response = test_utils.add_oms_attr_request("ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX",
+ data)
self.assertEqual(response.status_code, requests.codes.created)
def test_019_create_OTS_ROADMA_DEG1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'create-ots-oms',
{
'node-id': 'ROADM-A1',
response["output"]["result"])
def test_020_create_OTS_ROADMB_DEG1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'create-ots-oms',
{
'node-id': 'ROADM-B1',
response["output"]["result"])
def test_021_create_OTS_ROADMB_DEG2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'create-ots-oms',
{
'node-id': 'ROADM-B1',
response["output"]["result"])
def test_022_create_OTS_ROADMC_DEG2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'create-ots-oms',
{
'node-id': 'ROADM-C1',
response["output"]["result"])
def test_023_calculate_span_loss_base_all(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-olm', 'calculate-spanloss-base',
{
'src-type': 'all'
time.sleep(5)
def test_024_check_otn_topology(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['node']), 9, 'There should be 9 nodes')
self.assertNotIn('ietf-network-topology:link', response['network'][0],
# test service-create for OCH-OTU4 service from spdrA to spdrB
def test_025_create_OCH_OTU4_service_AB(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_026_get_OCH_OTU4_service_AB(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-AB")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-AB")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service-OCH-OTU4-AB')
# Check correct configuration of devices
def test_027_check_interface_och_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-761:768')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
def test_028_check_interface_OTU4_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
- response2 = test_utils_rfc8040.check_node_attribute2_request(
+ response2 = test_utils.check_node_attribute2_request(
'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
self.assertEqual(response2['status_code'], requests.codes.ok)
self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
def test_029_check_interface_och_spdrB(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-761:768')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
def test_030_check_interface_OTU4_spdrB(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-OTU')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
- response2 = test_utils_rfc8040.check_node_attribute2_request(
+ response2 = test_utils.check_node_attribute2_request(
'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
self.assertEqual(response2['status_code'], requests.codes.ok)
self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
def test_031_check_no_interface_ODU4_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(response['interface'], (
}))
def test_032_check_openroadm_topo_spdra(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR2', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR2', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
ele = response['node']['ietf-network-topology:termination-point'][0]
self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
time.sleep(1)
def test_033_check_openroadm_topo_ROADMA_SRG(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_034_check_openroadm_topo_ROADMA_DEG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_035_check_otn_topo_otu4_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
# test service-create for OCH-OTU4 service from spdrB to spdrC
-
def test_036_create_OCH_OTU4_service_BC(self):
self.cr_serv_input_data["service-name"] = "service-OCH-OTU4-BC"
self.cr_serv_input_data["service-a-end"]["node-id"] = "SPDR-SB1"
self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_037_get_OCH_OTU4_service_BC(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-BC")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-BC")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service-OCH-OTU4-BC')
# Check correct configuration of devices
def test_038_check_interface_och_spdrB(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-753:760')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
def test_039_check_interface_OTU4_spdrB(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-OTU')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
- response2 = test_utils_rfc8040.check_node_attribute2_request(
+ response2 = test_utils.check_node_attribute2_request(
'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
self.assertEqual(response2['status_code'], requests.codes.ok)
self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
def test_040_check_interface_och_spdrC(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-753:760')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
def test_041_check_interface_OTU4_spdrC(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
- response2 = test_utils_rfc8040.check_node_attribute2_request(
+ response2 = test_utils.check_node_attribute2_request(
'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-OTU', 'org-openroadm-otn-otu-interfaces:otu')
self.assertEqual(response2['status_code'], requests.codes.ok)
self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
def test_042_check_no_interface_ODU4_spdrB(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(response['interface'], (
}))
def test_043_check_openroadm_topo_spdrB(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SB1-XPDR2', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SB1-XPDR2', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
liste_tp = response['node']['ietf-network-topology:termination-point']
# pylint: disable=consider-using-f-string
time.sleep(1)
def test_044_check_openroadm_topo_ROADMB_SRG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-SRG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-SRG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_045_check_openroadm_topo_ROADMB_DEG2(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG2', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG2', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_046_check_otn_topo_otu4_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
# test service-create for 100GE service from spdrA to spdrC via spdrB
+
def test_047_create_100GE_service_ABC(self):
self.cr_serv_input_data["service-name"] = "service-100GE-ABC"
self.cr_serv_input_data["connection-type"] = "service"
self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_048_get_100GE_service_ABC(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-100GE-ABC")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE-ABC")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service-100GE-ABC')
time.sleep(1)
def test_049_check_interface_100GE_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ETHERNET')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
self.assertEqual('off', response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['fec'])
def test_050_check_interface_ODU4_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
{'payload-type': '21', 'exp-payload-type': '21'},
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
- response2 = test_utils_rfc8040.check_node_attribute_request(
+ response2 = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR2-CLIENT1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(input_dict_2['tx-sapi'],
response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['expected-dapi'])
def test_051_check_interface_ODU4_NETWORK_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
def test_052_check_ODU4_connection_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
response['odu-connection'][0]['source'])
def test_053_check_interface_100GE_CLIENT_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR2-CLIENT1-ETHERNET')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
self.assertEqual('off', response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['fec'])
def test_054_check_interface_ODU4_CLIENT_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR2-CLIENT1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
{'payload-type': '21', 'exp-payload-type': '21'},
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
- response2 = test_utils_rfc8040.check_node_attribute_request(
+ response2 = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(input_dict_2['tx-sapi'],
response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['expected-dapi'])
def test_055_check_interface_ODU4_NETWORK_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
def test_056_check_ODU4_connection_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
response['odu-connection'][0]['source'])
def test_057_check_interface_ODU4_NETWORK1_spdrb(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
def test_058_check_interface_ODU4_NETWORK2_spdrb(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK2-ODU4',
dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
def test_059_check_ODU4_connection_spdrb(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SB1', 'odu-connection', 'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
response['odu-connection'][0]['source'])
def test_060_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
for link in response['network'][0]['ietf-network-topology:link']:
def test_061_delete_service_100GE_ABC(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE-ABC"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_062_check_service_list(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['service-list']['services']), 2)
time.sleep(1)
def test_063_check_no_ODU4_connection_spdra(self):
- response = test_utils_rfc8040.check_node_request("SPDR-SA1")
+ response = test_utils.check_node_request("SPDR-SA1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
time.sleep(1)
def test_064_check_no_interface_ODU4_NETWORK_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_065_check_no_interface_ODU4_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ODU4')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_066_check_no_interface_100GE_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ETHERNET')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_068_delete_OCH_OTU4_service_AB(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-OCH-OTU4-AB"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
def test_069_delete_OCH_OTU4_service_BC(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-OCH-OTU4-BC"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_070_get_no_service(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(response['service-list'], (
{
time.sleep(1)
def test_071_check_no_interface_OTU4_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_072_check_no_interface_OCH_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-761:768')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_073_getLinks_OtnTopology(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_074_check_openroadm_topo_spdra(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR2', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR2', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
tp = response['node']['ietf-network-topology:termination-point'][0]
self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
time.sleep(1)
def test_075_check_openroadm_topo_ROADMB_SRG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-SRG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-SRG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_076_check_openroadm_topo_ROADMB_DEG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_077_check_openroadm_topo_ROADMB_DEG2(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG2', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG2', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_078_disconnect_xponders_from_roadm(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
links = response['network'][0]['ietf-network-topology:link']
for link in links:
if (link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT')
and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
- response = test_utils_rfc8040.del_ietf_network_link_request(
+ response = test_utils.del_ietf_network_link_request(
'openroadm-topology', link['link-id'], 'config')
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_079_disconnect_spdrB(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SB1")
+ response = test_utils.unmount_device("SPDR-SB1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_080_disconnect_roadmB(self):
- response = test_utils_rfc8040.unmount_device("ROADM-B1")
+ response = test_utils.unmount_device("ROADM-B1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_081_remove_roadm_to_roadm_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
links = response['network'][0]['ietf-network-topology:link']
for link in links:
if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
and 'ROADM-B1' in link['link-id']):
- response = test_utils_rfc8040.del_ietf_network_link_request(
+ response = test_utils.del_ietf_network_link_request(
'openroadm-topology', link['link-id'], 'config')
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX",
- data)
+ response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX",
+ data)
self.assertEqual(response.status_code, requests.codes.created)
def test_083_add_omsAttributes_ROADMC_ROADMA(self):
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX",
- data)
+ response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX",
+ data)
self.assertEqual(response.status_code, requests.codes.created)
def test_084_create_OCH_OTU4_service_AC(self):
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_085_get_OCH_OTU4_service_AC(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-AC")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-AC")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service-OCH-OTU4-AC')
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_087_get_100GE_service_AC(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-100GE-AC")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE-AC")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service-100GE-AC')
time.sleep(1)
def test_088_check_interface_OTU4_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
- response2 = test_utils_rfc8040.check_node_attribute2_request(
+ response2 = test_utils.check_node_attribute2_request(
'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
self.assertEqual(response2['status_code'], requests.codes.ok)
self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
def test_089_check_interface_OTU4_spdrC(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
- response2 = test_utils_rfc8040.check_node_attribute2_request(
+ response2 = test_utils.check_node_attribute2_request(
'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
self.assertEqual(response2['status_code'], requests.codes.ok)
self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
self.test_056_check_ODU4_connection_spdrc()
def test_092_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
for link in response['network'][0]['ietf-network-topology:link']:
def test_093_delete_100GE_service_AC(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE-AC"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_094_check_service_list(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['service-list']['services']), 1)
time.sleep(1)
self.test_066_check_no_interface_100GE_CLIENT_spdra()
def test_096_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
for link in response['network'][0]['ietf-network-topology:link']:
link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
def test_097_disconnect_xponders_from_roadm(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
links = response['network'][0]['ietf-network-topology:link']
for link in links:
if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
- response = test_utils_rfc8040.del_ietf_network_link_request(
+ response = test_utils.del_ietf_network_link_request(
'openroadm-topology', link['link-id'], 'config')
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_098_disconnect_spdrA(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ response = test_utils.unmount_device("SPDR-SA1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_099_disconnect_spdrC(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+ response = test_utils.unmount_device("SPDR-SC1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_100_disconnect_roadmA(self):
- response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ response = test_utils.unmount_device("ROADM-A1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_101_disconnect_roadmC(self):
- response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ response = test_utils.unmount_device("ROADM-C1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))