sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
class TransportPCEFulltesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils_rfc8040.start_tpce()
- cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION_121),
- ('roadma', cls.NODE_VERSION_221),
- ('roadmc', cls.NODE_VERSION_221),
- ('xpdrc', cls.NODE_VERSION_71)])
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_121),
+ ('roadma', cls.NODE_VERSION_221),
+ ('roadmc', cls.NODE_VERSION_221),
+ ('xpdrc', cls.NODE_VERSION_71)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
time.sleep(3)
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_connect_xpdrA(self):
- response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
+ response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_xpdrC(self):
- response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
+ response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
# test service-create for Eth service from xpdr to xpdr
def test_11_create_eth_service1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_12_get_eth_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ response = test_utils.get_ordm_serv_list_attr_request(
"services", "service1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
"administrative-state": "outOfService",
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_14_check_update_portmapping(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDRA01", None, None)
+ response = test_utils.get_portmapping_node_attr("XPDRA01", None, None)
self.assertEqual(response['status_code'], requests.codes.ok)
mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
time.sleep(1)
def test_15_check_update_openroadm_topo(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
node_list = response['network'][0]['node']
nb_updated_tp = 0
time.sleep(1)
def test_16_check_update_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service1")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['operational-state'], 'outOfService')
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
"administrative-state": "inService",
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_18_check_update_portmapping_ok(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDRA01", None, None)
+ response = test_utils.get_portmapping_node_attr("XPDRA01", None, None)
self.assertEqual(response['status_code'], requests.codes.ok)
mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
time.sleep(1)
def test_19_check_update_openroadm_topo_ok(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
node_list = response['network'][0]['node']
for node in node_list:
"administrative-state": "outOfService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_22_check_update_portmapping(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
+ response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
self.assertEqual(response['status_code'], requests.codes.ok)
mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
time.sleep(1)
def test_23_check_update_openroadm_topo(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
node_list = response['network'][0]['node']
nb_updated_tp = 0
"administrative-state": "inService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"administrative-state": "outOfService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_29_check_update_portmapping(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
+ response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
self.assertEqual(response['status_code'], requests.codes.ok)
mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
time.sleep(1)
def test_30_check_update_openroadm_topo(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
node_list = response['network'][0]['node']
nb_updated_tp = 0
"administrative-state": "inService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"administrative-state": "outOfService",
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_36_check_update_portmapping(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C1", None, None)
+ response = test_utils.get_portmapping_node_attr("XPDR-C1", None, None)
self.assertEqual(response['status_code'], requests.codes.ok)
mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
time.sleep(1)
def test_37_check_update_openroadm_topo(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
node_list = response['network'][0]['node']
nb_updated_tp = 0
"administrative-state": "inService",
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"administrative-state": "outOfService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
- timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_43_check_update_portmapping(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
+ response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
self.assertEqual(response['status_code'], requests.codes.ok)
mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
time.sleep(1)
def test_44_check_update_openroadm_topo(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
node_list = response['network'][0]['node']
nb_updated_tp = 0
def test_46_delete_eth_service1(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_47_disconnect_xponders_from_roadm(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
links = response['network'][0]['ietf-network-topology:link']
for link in links:
if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
- response = test_utils_rfc8040.del_ietf_network_link_request(
+ response = test_utils.del_ietf_network_link_request(
'openroadm-topology', link['link-id'], 'config')
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_48_disconnect_XPDRA(self):
- response = test_utils_rfc8040.unmount_device("XPDRA01")
+ response = test_utils.unmount_device("XPDRA01")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_49_disconnect_XPDRC(self):
- response = test_utils_rfc8040.unmount_device("XPDR-C1")
+ response = test_utils.unmount_device("XPDR-C1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_50_disconnect_ROADMA(self):
- response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ response = test_utils.unmount_device("ROADM-A1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_51_disconnect_ROADMC(self):
- response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ response = test_utils.unmount_device("ROADM-C1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))