sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils_rfc8040.start_tpce()
- cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION_71),
- ('roadma', cls.NODE_VERSION_221),
- ('roadmc', cls.NODE_VERSION_221),
- ('xpdrc2', cls.NODE_VERSION_71)])
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
+ ('roadma', cls.NODE_VERSION_221),
+ ('roadmc', cls.NODE_VERSION_221),
+ ('xpdrc2', cls.NODE_VERSION_71)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(1)
def test_01_connect_xpdra2(self):
- response = test_utils_rfc8040.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
+ response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_xpdrc2(self):
- response = test_utils_rfc8040.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
+ response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdma(self):
- response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmc(self):
- response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '2', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '2', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '2', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '2', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
# test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
def test_11_check_otn_topology(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['node']), 6, 'There should be 6 nodes')
self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_12_create_OTUC4_service(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_13_get_OTUC4_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ response = test_utils.get_ordm_serv_list_attr_request(
"services", "service1-OTUC4")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
# Check correct configuration of devices
def test_14_check_interface_otsi_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-755:768')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
def test_15_check_interface_OTSI_GROUP_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
['org-openroadm-otsi-group-interfaces:otsi-group'])
def test_16_check_interface_OTUC4_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTUC4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
['org-openroadm-otn-otu-interfaces:otu'])
def test_17_check_interface_otsi_xpdrc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-C2', 'interface', 'XPDR2-NETWORK1-755:768')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-C2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
['org-openroadm-otsi-group-interfaces:otsi-group'])
def test_19_check_interface_OTUC4_xpdrc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-C2', 'interface', 'XPDR2-NETWORK1-OTUC4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
['org-openroadm-otn-otu-interfaces:otu'])
def test_20_check_no_interface_ODUC4_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_21_check_openroadm_topo_xpdra2(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
ele = response['node']['ietf-network-topology:termination-point'][0]
self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
def test_22_check_otn_topo_OTUC4_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_24_get_ODUC4_service1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ response = test_utils.get_ordm_serv_list_attr_request(
"services", "service1-ODUC4")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['service-name'], 'service1-ODUC4')
'org-openroadm-otn-common-types:ODUCn')
def test_25_check_interface_ODUC4_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_26_check_interface_ODUC4_xpdrc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODUC4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_27_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
for link in response['network'][0]['ietf-network-topology:link']:
self.fail("this link should not exist")
def test_28_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_30_get_100GE_service_1(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ response = test_utils.get_ordm_serv_list_attr_request(
"services", "service-100GE")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
def test_31_check_interface_100GE_CLIENT_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
['opucn-trib-slots'])
def test_34_check_ODU4_connection_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
response['odu-connection'][0]['source'])
def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-C2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-C2', 'interface', 'XPDR2-CLIENT1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
'parent-odu-allocation']['opucn-trib-slots'])
def test_38_check_ODU4_connection_xpdrc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-C2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
response['odu-connection'][0]['source'])
def test_39_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
for link in response['network'][0]['ietf-network-topology:link']:
link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
def test_40_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_42_get_100GE_service_2(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-100GE2")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE2")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service-100GE2')
time.sleep(1)
def test_43_check_service_list(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['service-list']['services']), 4)
def test_44_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
for link in response['network'][0]['ietf-network-topology:link']:
link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
def test_45_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
def test_46_delete_100GE_service_2(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE2"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
def test_47_delete_100GE_service_1(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_48_check_service_list(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['service-list']['services']), 2)
def test_49_check_no_ODU4_connection_xpdra2(self):
- response = test_utils_rfc8040.check_node_request("XPDR-A2")
+ response = test_utils.check_node_request("XPDR-A2")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_53_check_otn_topo_links(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
for link in response['network'][0]['ietf-network-topology:link']:
link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
def test_54_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
def test_55_delete_ODUC4_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODUC4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_56_check_service_list(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['service-list']['services']), 1)
def test_57_check_no_interface_ODU4_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
self.assertEqual(response['status_code'], requests.codes.conflict)
self.test_22_check_otn_topo_OTUC4_links()
def test_59_check_otn_topo_tp(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
for node in response['network'][0]['node']:
if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
def test_60_delete_OTUC4_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OTUC4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_61_get_no_service(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(response['service-list'], (
{
}))
def test_62_check_no_interface_OTUC4_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTUC4')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_63_check_no_interface_OTSI_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-755:768')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_64_check_no_interface_OTSIG_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_65_getLinks_OtnTopology(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_66_check_openroadm_topo_xpdra2(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
tp = response['node']['ietf-network-topology:termination-point'][0]
self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
tp['org-openroadm-network-topology:xpdr-network-attributes']))
def test_67_check_openroadm_topology(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
links = response['network'][0]['ietf-network-topology:link']
self.assertEqual(22, len(links), 'Topology should contain 22 links')
def test_68_connect_xprda2_1_N1_to_roadma_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_69_connect_roadma_PP2_to_xpdra2_1_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
def test_70_connect_xprdc2_1_N1_to_roadmc_PP2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_71_connect_roadmc_PP2_to_xpdrc2_1_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
del self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"]
del self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"]
del self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"]
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_73_get_400GE_service(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-400GE")
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service-400GE")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], 'service-400GE')
time.sleep(1)
def test_74_check_xc1_roadma(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"ROADM-A1", "roadm-connections", "SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(
time.sleep(1)
def test_75_check_topo_xpdra2(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
liste_tp = response['node']['ietf-network-topology:termination-point']
for ele in liste_tp:
time.sleep(1)
def test_76_check_topo_roadma_SRG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_77_check_topo_roadma_DEG1(self):
- response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
+ response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
freq_map = base64.b64decode(
response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
time.sleep(1)
def test_78_check_interface_400GE_CLIENT_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-CLIENT1-ETHERNET')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
def test_79_check_interface_OTSI_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-NETWORK1-755:768')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
def test_80_check_interface_OTSI_GROUP_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTSIGROUP-400G')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTSIGROUP-400G',
response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
def test_81_check_interface_OTUC4_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTUC4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_82_check_interface_ODUC4_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUC4')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
self.assertEqual('XPDR1-NETWORK1-OTUC4', response['interface'][0]['supporting-interface-list'][0])
def test_82a_check_interface_ODUFLEX_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUFLEX')
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUFLEX',
def test_83_delete_400GE_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-400GE"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_84_get_no_service(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(response['service-list'], (
{
time.sleep(1)
def test_85_check_no_interface_ODUC4_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUC4')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_86_check_no_interface_OTUC4_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTUC4')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTSIGROUP-400G')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_88_check_no_interface_OTSI_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-NETWORK1-755:768')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
'XPDR-A2', 'interface', 'XPDR1-CLIENT1-ETHERNET')
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_90_disconnect_xponders_from_roadm(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
links = response['network'][0]['ietf-network-topology:link']
for link in links:
if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
- response = test_utils_rfc8040.del_ietf_network_link_request(
+ response = test_utils.del_ietf_network_link_request(
'openroadm-topology', link['link-id'], 'config')
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_91_disconnect_xpdra2(self):
- response = test_utils_rfc8040.unmount_device("XPDR-A2")
+ response = test_utils.unmount_device("XPDR-A2")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_92_disconnect_xpdrc2(self):
- response = test_utils_rfc8040.unmount_device("XPDR-C2")
+ response = test_utils.unmount_device("XPDR-C2")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_93_disconnect_roadmA(self):
- response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ response = test_utils.unmount_device("ROADM-A1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_94_disconnect_roadmC(self):
- response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ response = test_utils.unmount_device("ROADM-C1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))