sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
class TransportPCEFulltesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils_rfc8040.start_tpce()
- cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION),
- ('roadma-full', cls.NODE_VERSION),
- ('roadmc-full', cls.NODE_VERSION),
- ('xpdrc', cls.NODE_VERSION)])
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
+ ('roadma-full', cls.NODE_VERSION),
+ ('roadmc-full', cls.NODE_VERSION),
+ ('xpdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self): # instruction executed before each test method
# connect netconf devices
def test_01_connect_xpdrA(self):
- response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_xpdrC(self):
- response = test_utils_rfc8040.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils_rfc8040.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils_rfc8040.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX",
- data)
+ response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX",
+ data)
self.assertEqual(response.status_code, requests.codes.created)
def test_10_add_omsAttributes_ROADMC_ROADMA(self):
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX",
- data)
+ response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX",
+ data)
self.assertEqual(response.status_code, requests.codes.created)
# test service-create for Eth service from xpdr to xpdr
def test_11_create_eth_service1(self):
self.cr_serv_input_data["service-name"] = "service1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_12_get_eth_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service1")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service1')
time.sleep(1)
def test_13_check_xc1_ROADMA(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"ROADMA01", "roadm-connections", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(1)
def test_14_check_xc1_ROADMC(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"ROADMC01", "roadm-connections", "SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(1)
def test_15_check_topo_XPDRA(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPDRA01-XPDR1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDRA01-XPDR1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
liste_tp = response['node']['ietf-network-topology:termination-point']
for ele in liste_tp:
time.sleep(1)
def test_16_check_topo_ROADMA_SRG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_17_check_topo_ROADMA_DEG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-DEG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-DEG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '2',
'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
time.sleep(2)
def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '2',
'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
time.sleep(2)
def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '2',
'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
time.sleep(2)
def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '2',
'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
def test_22_create_eth_service2(self):
self.cr_serv_input_data["service-name"] = "service2"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_23_get_eth_service2(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service2")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service2")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service2')
time.sleep(1)
def test_24_check_xc2_ROADMA(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"ROADMA01", "roadm-connections", "DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(1)
def test_25_check_topo_XPDRA(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPDRA01-XPDR1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDRA01-XPDR1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
liste_tp = response['node']['ietf-network-topology:termination-point']
for ele in liste_tp:
time.sleep(1)
def test_26_check_topo_ROADMA_SRG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_27_check_topo_ROADMA_DEG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-DEG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-DEG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
# creation service test on a non-available resource
def test_28_create_eth_service3(self):
self.cr_serv_input_data["service-name"] = "service3"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
def test_29_delete_eth_service3(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service3"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
def test_30_delete_eth_service1(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
def test_31_delete_eth_service2(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_32_check_no_xc_ROADMA(self):
- response = test_utils_rfc8040.check_node_request("ROADMA01")
+ response = test_utils.check_node_request("ROADMA01")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn('roadm-connections',
dict.keys(response['org-openroadm-device']))
time.sleep(2)
def test_33_check_topo_XPDRA(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPDRA01-XPDR1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDRA01-XPDR1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
liste_tp = response['node']['ietf-network-topology:termination-point']
for ele in liste_tp:
time.sleep(1)
def test_34_check_topo_ROADMA_SRG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_35_check_topo_ROADMA_DEG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-DEG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-DEG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
self.cr_serv_input_data["service-a-end"]["service-format"] = "OC"
self.cr_serv_input_data["service-z-end"]["node-id"] = "ROADMC01"
self.cr_serv_input_data["service-z-end"]["service-format"] = "OC"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_37_get_oc_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service1")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service1')
time.sleep(1)
def test_38_check_xc1_ROADMA(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"ROADMA01", "roadm-connections", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(1)
def test_39_check_xc1_ROADMC(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"ROADMC01", "roadm-connections", "SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.cr_serv_input_data["service-a-end"]["service-format"] = "OC"
self.cr_serv_input_data["service-z-end"]["node-id"] = "ROADMC01"
self.cr_serv_input_data["service-z-end"]["service-format"] = "OC"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_41_get_oc_service2(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service2")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service2")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service2')
time.sleep(2)
def test_42_check_xc2_ROADMA(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"ROADMA01", "roadm-connections", "SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760")
self.assertEqual(response['status_code'], requests.codes.ok)
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
def test_44_delete_oc_service1(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
def test_45_delete_oc_service2(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_46_get_no_oc_services(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(response['service-list'], (
{
time.sleep(1)
def test_47_get_no_xc_ROADMA(self):
- response = test_utils_rfc8040.check_node_request("ROADMA01")
+ response = test_utils.check_node_request("ROADMA01")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn('roadm-connections', dict.keys(response['org-openroadm-device']))
time.sleep(1)
self.test_30_delete_eth_service1()
def test_50_loop_create_oc_service(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service1")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service1")
if response['status_code'] != 404:
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
time.sleep(5)
self.test_44_delete_oc_service1()
def test_51_disconnect_XPDRA(self):
- response = test_utils_rfc8040.unmount_device("XPDRA01")
+ response = test_utils.unmount_device("XPDRA01")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_52_disconnect_XPDRC(self):
- response = test_utils_rfc8040.unmount_device("XPDRC01")
+ response = test_utils.unmount_device("XPDRC01")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_53_disconnect_ROADMA(self):
- response = test_utils_rfc8040.unmount_device("ROADMA01")
+ response = test_utils.unmount_device("ROADMA01")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_54_disconnect_ROADMC(self):
- response = test_utils_rfc8040.unmount_device("ROADMC01")
+ response = test_utils.unmount_device("ROADMC01")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))