sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCEFulltesting(unittest.TestCase):
processes = None
- cr_serv_sample_data = {"input": {
+ cr_serv_input_data = {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
"rpc-action": "service-create",
"due-date": "2016-11-28T00:00:01Z",
"operator-contact": "pw1234"
}
+
+ del_serv_input_data = {
+ "sdnc-request-header": {
+ "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
+ "rpc-action": "service-delete",
+ "request-system-id": "appname",
+ "notification-url": "http://localhost:8585/NotificationServer/notify"},
+ "service-delete-req-info": {
+ "service-name": "TBD",
+ "tail-retention": "no"}
}
WAITING = 25 # nominal value is 300
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_121),
- ('roadma', cls.NODE_VERSION_221),
- ('roadmc', cls.NODE_VERSION_221),
- ('xpdrc', cls.NODE_VERSION_71)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION_121),
+ ('roadma', cls.NODE_VERSION_221),
+ ('roadmc', cls.NODE_VERSION_221),
+ ('xpdrc', cls.NODE_VERSION_71)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
- time.sleep(10)
+ time.sleep(3)
def setUp(self): # instruction executed before each test method
# pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_connect_xpdrA(self):
- response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION_121))
+ self.assertEqual(response.status_code,
+ requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_02_connect_xpdrC(self):
- response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_71))
+ self.assertEqual(response.status_code,
+ requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
+ self.assertEqual(response.status_code,
+ requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
+ self.assertEqual(response.status_code,
+ requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
- response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
- "ROADM-A1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
- response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
- "ROADM-A1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
- response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
- response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
def test_09_add_omsAttributes_ROADMA_ROADMC(self):
# Config ROADMA-ROADMC oms-attributes
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
+ response = test_utils_rfc8040.add_oms_attr_request(
+ "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_10_add_omsAttributes_ROADMC_ROADMA(self):
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
+ response = test_utils_rfc8040.add_oms_attr_request(
+ "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
# test service-create for Eth service from xpdr to xpdr
def test_11_create_eth_service1(self):
- self.cr_serv_sample_data["input"]["service-name"] = "service1"
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ response['output']['configuration-response-common']['response-message'])
time.sleep(self.WAITING)
def test_12_get_eth_service1(self):
- response = test_utils.get_service_list_request("services/service1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['services'][0]['administrative-state'], 'inService')
- self.assertEqual(
- res['services'][0]['service-name'], 'service1')
- self.assertEqual(
- res['services'][0]['connection-type'], 'service')
- self.assertEqual(
- res['services'][0]['lifecycle-state'], 'planned')
- time.sleep(2)
+ response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
+ "services", "service1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['services'][0]['administrative-state'], 'inService')
+ self.assertEqual(response['services'][0]['connection-type'], 'service')
+ self.assertEqual(response['services'][0]['service-name'], 'service1')
+ self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
+ time.sleep(1)
def test_13_change_status_line_port_xpdra(self):
url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
"administrative-state": "outOfService",
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_14_check_update_portmapping(self):
- response = test_utils.portmapping_request("XPDRA01")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDRA01", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
self.assertEqual(mapping['port-oper-state'], 'OutOfService',
time.sleep(1)
def test_15_check_update_openroadm_topo(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
nb_updated_tp = 0
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
nb_updated_link = 0
time.sleep(1)
def test_16_check_update_service1(self):
- response = test_utils.get_service_list_request("services/service1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
- self.assertEqual(res['services'][0]['administrative-state'], 'inService')
+ response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['services'][0]['operational-state'], 'outOfService')
+ self.assertEqual(response['services'][0]['administrative-state'], 'inService')
time.sleep(1)
def test_17_restore_status_line_port_xpdra(self):
"administrative-state": "inService",
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_18_check_update_portmapping_ok(self):
- response = test_utils.portmapping_request("XPDRA01")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDRA01", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
self.assertEqual(mapping['port-oper-state'], 'InService',
"Operational State should be 'InService'")
time.sleep(1)
def test_19_check_update_openroadm_topo_ok(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
for link in link_list:
self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
"administrative-state": "outOfService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_22_check_update_portmapping(self):
- response = test_utils.portmapping_request("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
self.assertEqual(mapping['port-oper-state'], 'OutOfService',
time.sleep(1)
def test_23_check_update_openroadm_topo(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
nb_updated_tp = 0
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
updated_links = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
nb_updated_link = 0
"administrative-state": "inService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"administrative-state": "outOfService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_29_check_update_portmapping(self):
- response = test_utils.portmapping_request("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
self.assertEqual(mapping['port-oper-state'], 'OutOfService',
time.sleep(1)
def test_30_check_update_openroadm_topo(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
nb_updated_tp = 0
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
nb_updated_link = 0
"administrative-state": "inService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"administrative-state": "outOfService",
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_36_check_update_portmapping(self):
- response = test_utils.portmapping_request("XPDR-C1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C1", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
self.assertEqual(mapping['port-oper-state'], 'OutOfService',
time.sleep(1)
def test_37_check_update_openroadm_topo(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
nb_updated_tp = 0
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
nb_updated_link = 0
"administrative-state": "inService",
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"administrative-state": "outOfService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_43_check_update_portmapping(self):
- response = test_utils.portmapping_request("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
self.assertEqual(mapping['port-oper-state'], 'OutOfService',
time.sleep(1)
def test_44_check_update_openroadm_topo(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
nb_updated_tp = 0
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
nb_updated_link = 0
for link in link_list:
self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
self.test_12_get_eth_service1()
def test_46_delete_eth_service1(self):
- response = test_utils.service_delete_request("service1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
+ response['output']['configuration-response-common']['response-message'])
time.sleep(self.WAITING)
def test_47_disconnect_xponders_from_roadm(self):
- url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
- response = test_utils.get_ordm_topo_request("")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- links = res['network'][0]['ietf-network-topology:link']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ links = response['network'][0]['ietf-network-topology:link']
for link in links:
if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
- link_name = link["link-id"]
- response = test_utils.delete_request(url+link_name)
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.del_ietf_network_link_request(
+ 'openroadm-topology', link['link-id'], 'config')
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_48_disconnect_XPDRA(self):
- response = test_utils.unmount_device("XPDRA01")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("XPDRA01")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_49_disconnect_XPDRC(self):
- response = test_utils.unmount_device("XPDR-C1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("XPDR-C1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_50_disconnect_ROADMA(self):
- response = test_utils.unmount_device("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_51_disconnect_ROADMC(self):
- response = test_utils.unmount_device("ROADM-C1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":