sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils_rfc8040.start_tpce()
- cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
- ('roadma', cls.NODE_VERSION),
- ('roadmc', cls.NODE_VERSION),
- ('spdrc', cls.NODE_VERSION)])
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(5)
def test_01_connect_spdrA(self):
- response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_spdrC(self):
- response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertEqual(response['status_code'], requests.codes.ok)
def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertEqual(response['status_code'], requests.codes.ok)
def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertEqual(response['status_code'], requests.codes.ok)
def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
# test service-create for OCH-OTU4 service from spdr to spdr
def test_11_check_otn_topology(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['node']), 6)
self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_12_create_OCH_OTU4_service(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_13_get_OCH_OTU4_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ response = test_utils.get_ordm_serv_list_attr_request(
"services", "service1-OCH-OTU4")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
# Check correct configuration of devices
def test_14_check_interface_och_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
def test_15_check_interface_OTU4_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_16_check_interface_och_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
def test_17_check_interface_OTU4_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_18_check_no_interface_ODU4_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_19_check_openroadm_topo_spdra(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
ele = response['node']['ietf-network-topology:termination-point'][0]
self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
def test_20_check_openroadm_topo_ROADMA_SRG(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
self.assertNotIn('avail-freq-maps', dict.keys(ele))
def test_21_check_openroadm_topo_ROADMA_DEG(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
def test_22_check_otn_topo_otu4_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_24_get_ODU4_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ response = test_utils.get_ordm_serv_list_attr_request(
"services", "service1-ODU4")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
response['services'][0]['lifecycle-state'], 'planned')
def test_25_check_interface_ODU4_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_26_check_interface_ODU4_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_27_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
for link in response['network'][0]['ietf-network-topology:link']:
self.fail("this link should not exist")
def test_28_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
self.cr_serv_input_data["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_30_get_10GE_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ response = test_utils.get_ordm_serv_list_attr_request(
"services", "service1-10GE")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
response['services'][0]['lifecycle-state'], 'planned')
def test_31_check_interface_10GE_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_32_check_interface_ODU2E_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_33_check_interface_ODU2E_NETWORK_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
['trib-slots'])
def test_34_check_ODU2E_connection_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
response['odu-connection'][0]['source'])
def test_35_check_interface_10GE_CLIENT_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
'parent-odu-allocation']['trib-slots'])
def test_38_check_ODU2E_connection_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
response['odu-connection'][0]['source'])
def test_39_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
for link in response['network'][0]['ietf-network-topology:link']:
link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
def test_40_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
def test_41_delete_10GE_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-10GE"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_42_check_service_list(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['service-list']['services']), 2)
def test_43_check_no_ODU2e_connection_spdra(self):
- response = test_utils_rfc8040.check_node_request("SPDR-SA1")
+ response = test_utils.check_node_request("SPDR-SA1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_46_check_no_interface_10GE_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_47_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
for link in response['network'][0]['ietf-network-topology:link']:
link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
def test_48_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
def test_49_delete_ODU4_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_50_check_service_list(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['service-list']['services']), 1)
def test_51_check_no_interface_ODU4_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.conflict)
self.test_22_check_otn_topo_otu4_links()
def test_53_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
def test_54_delete_OCH_OTU4_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_55_get_no_service(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_56_check_no_interface_OTU4_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_57_check_no_interface_OCH_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_58_getLinks_OtnTopology(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_59_check_openroadm_topo_spdra(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
tp = response['node']['ietf-network-topology:termination-point'][0]
self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
tp['org-openroadm-network-topology:xpdr-network-attributes']))
def test_60_check_openroadm_topo_ROADMA_SRG(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(3)
def test_61_check_openroadm_topo_ROADMA_DEG(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(3)
def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.assertEqual(response['status_code'], requests.codes.ok)
def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.assertEqual(response['status_code'], requests.codes.ok)
def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.assertEqual(response['status_code'], requests.codes.ok)
def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_67_get_OCH_OTU4_service2(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ response = test_utils.get_ordm_serv_list_attr_request(
"services", "service2-OCH-OTU4")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_69_get_ODU4_service2(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ response = test_utils.get_ordm_serv_list_attr_request(
"services", "service2-ODU4")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_71_get_1GE_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ response = test_utils.get_ordm_serv_list_attr_request(
"services", "service1-1GE")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
response['services'][0]['lifecycle-state'], 'planned')
def test_72_check_interface_1GE_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_73_check_interface_ODU0_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_74_check_interface_ODU0_NETWORK_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
['trib-slots'])
def test_75_check_ODU0_connection_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
response['odu-connection'][0]['source'])
def test_76_check_interface_1GE_CLIENT_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_77_check_interface_ODU0_CLIENT_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_78_check_interface_ODU0_NETWORK_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
'parent-odu-allocation']['trib-slots'])
def test_79_check_ODU0_connection_spdrc(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
response['odu-connection'][0]['source'])
def test_80_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
for link in response['network'][0]['ietf-network-topology:link']:
link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
def test_81_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
def test_82_delete_1GE_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-1GE"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_83_check_service_list(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['service-list']['services']), 2)
def test_84_check_no_ODU0_connection_spdra(self):
- response = test_utils_rfc8040.check_node_request("SPDR-SA1")
+ response = test_utils.check_node_request("SPDR-SA1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_87_check_no_interface_10GE_CLIENT_spdra(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_88_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
for link in response['network'][0]['ietf-network-topology:link']:
link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
def test_89_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
def test_90_delete_ODU4_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-ODU4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
def test_91_delete_OCH_OTU4_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-OCH-OTU4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_92_disconnect_xponders_from_roadm(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
links = response['network'][0]['ietf-network-topology:link']
for link in links:
if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
- response = test_utils_rfc8040.del_ietf_network_link_request(
+ response = test_utils.del_ietf_network_link_request(
'openroadm-topology', link['link-id'], 'config')
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_93_check_openroadm_topology(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(18,
len(response['network'][0]['ietf-network-topology:link']),
'Topology should contain 18 links')
def test_94_disconnect_spdrA(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ response = test_utils.unmount_device("SPDR-SA1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_95_disconnect_spdrC(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+ response = test_utils.unmount_device("SPDR-SC1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_96_disconnect_roadmA(self):
- response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ response = test_utils.unmount_device("ROADM-A1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_97_disconnect_roadmC(self):
- response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ response = test_utils.unmount_device("ROADM-C1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))