sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils_rfc8040.start_tpce()
- cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION_71),
- ('roadma', cls.NODE_VERSION_221),
- ('roadmb', cls.NODE_VERSION_221),
- ('roadmc', cls.NODE_VERSION_221),
- ('xpdrc2', cls.NODE_VERSION_71)])
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
+ ('roadma', cls.NODE_VERSION_221),
+ ('roadmb', cls.NODE_VERSION_221),
+ ('roadmc', cls.NODE_VERSION_221),
+ ('xpdrc2', cls.NODE_VERSION_71)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(1)
def test_01_connect_xpdra2(self):
- response = test_utils_rfc8040.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
+ response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_xpdrc2(self):
- response = test_utils_rfc8040.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
+ response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdma(self):
- response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmb(self):
- response = test_utils_rfc8040.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION_221))
+ response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION_221))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_rdmc(self):
- response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_06_connect_xprda2_1_N1_to_roadma_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_07_connect_roadma_PP1_to_xpdra2_1_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
def test_08_connect_xprdc2_1_N1_to_roadmc_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_09_connect_roadmc_PP1_to_xpdrc2_1_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
def test_10_connect_xprda2_3_N1_to_roadma_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '3', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_11_connect_roadma_PP2_to_xpdra2_3_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '3', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
def test_12_connect_xprdc2_3_N1_to_roadmc_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '3', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_13_connect_roadmc_PP2_to_xpdrc2_3_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '3', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_20_create_OTS_ROADMA_DEG1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'create-ots-oms',
{
'node-id': 'ROADM-A1',
response["output"]["result"])
def test_21_create_OTS_ROADMC_DEG2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'create-ots-oms',
{
'node-id': 'ROADM-C1',
response["output"]["result"])
def test_22_create_OTS_ROADMB_DEG1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'create-ots-oms',
{
'node-id': 'ROADM-B1',
response["output"]["result"])
def test_23_create_OTS_ROADMB_DEG2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'create-ots-oms',
{
'node-id': 'ROADM-B1',
response["output"]["result"])
def test_24_calculate_span_loss_base_all(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-olm', 'calculate-spanloss-base',
{
'src-type': 'all'
# test service-create for Eth service from xpdr to xpdr with service-resiliency
def test_25_create_eth_service1_with_service_resiliency_restorable(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_26_get_eth_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service1")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service1')
time.sleep(1)
def test_27_get_service_path_service_1(self):
- response = test_utils_rfc8040.get_serv_path_list_attr("service-paths", "service1")
+ response = test_utils.get_serv_path_list_attr("service-paths", "service1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertCountEqual(
self.service_path_service_1_AtoZ,
def test_28_create_eth_service2_without_service_resiliency(self):
self.cr_serv_input_data["service-name"] = "service2"
del self.cr_serv_input_data["service-resiliency"]
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_29_get_eth_service2(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service2")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service2")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service2')
time.sleep(1)
def test_30_get_service_path_service_2(self):
- response = test_utils_rfc8040.get_serv_path_list_attr("service-paths", "service2")
+ response = test_utils.get_serv_path_list_attr("service-paths", "service2")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertCountEqual(
self.service_path_service_2_AtoZ,
}
}
response = requests.request("POST", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.json()['output']['status-message'], "The PMs has been successfully set !")
time.sleep(self.WAITING * 2)
self.test_26_get_eth_service1()
def test_33_get_service_path_service_1(self):
- response = test_utils_rfc8040.get_serv_path_list_attr("service-paths", "service1")
+ response = test_utils.get_serv_path_list_attr("service-paths", "service1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertCountEqual(
self.service_path_service_1_rerouted_AtoZ,
response['service-paths'][0]['path-description']['aToZ-direction']['aToZ'])
def test_34_get_eth_service2(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service2")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service2")
self.assertEqual(response['services'][0]['operational-state'], 'outOfService')
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service2')
time.sleep(1)
def test_35_get_service_path_service_2(self):
- response = test_utils_rfc8040.get_serv_path_list_attr("service-paths", "service2")
+ response = test_utils.get_serv_path_list_attr("service-paths", "service2")
self.assertEqual(response['status_code'], requests.codes.ok)
index = self.service_path_service_2_AtoZ.index(
{
}
}
response = requests.request("POST", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.json()['output']['status-message'], "The PMs has been successfully released !")
time.sleep(2)
def test_41_delete_eth_service2(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
def test_42_delete_eth_service1(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_43_disconnect_xponders_from_roadm(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
links = response['network'][0]['ietf-network-topology:link']
for link in links:
if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
- response = test_utils_rfc8040.del_ietf_network_link_request(
+ response = test_utils.del_ietf_network_link_request(
'openroadm-topology', link['link-id'], 'config')
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_44_disconnect_xpdra2(self):
- response = test_utils_rfc8040.unmount_device("XPDR-A2")
+ response = test_utils.unmount_device("XPDR-A2")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_45_disconnect_xpdrc2(self):
- response = test_utils_rfc8040.unmount_device("XPDR-C2")
+ response = test_utils.unmount_device("XPDR-C2")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_46_disconnect_roadmA(self):
- response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ response = test_utils.unmount_device("ROADM-A1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_47_disconnect_roadmB(self):
- response = test_utils_rfc8040.unmount_device("ROADM-B1")
+ response = test_utils.unmount_device("ROADM-B1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_48_disconnect_roadmC(self):
- response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ response = test_utils.unmount_device("ROADM-C1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))