RESTCONF_PORT = 8181
RESTCONF_PATH_PREFIX = {'rfc8040': '/rests',
- 'lighty': '/restconf',
'draft-bierman02': '/restconf'}
+if 'USE_LIGHTY' in os.environ and os.environ['USE_LIGHTY'] == 'True':
+ RESTCONF_PATH_PREFIX['rfc8040'] = '/restconf'
+
if 'USE_ODL_RESTCONF_VERSION' in os.environ:
RESTCONF_VERSION = os.environ['USE_ODL_RESTCONF_VERSION']
if RESTCONF_VERSION not in RESTCONF_PATH_PREFIX:
def mount_device(node: str, sim: str):
url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}',
- 'lighty': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}',
'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}'}
body = {'node': [{
'node-id': node,
def unmount_device(node: str):
url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}',
- 'lighty': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}',
'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}'}
response = delete_request(url[RESTCONF_VERSION].format('{}', node))
if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: " + node), 180):
def check_device_connection(node: str):
url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}?content=nonconfig',
- 'lighty': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}?content=nonconfig',
'draft-bierman02': '{}/operational/network-topology:network-topology/topology/topology-netconf/node/{}'}
response = get_request(url[RESTCONF_VERSION].format('{}', node))
res = response.json()
return_key = {'rfc8040': 'network-topology:node',
- 'lighty': 'network-topology:node',
'draft-bierman02': 'node'}
if return_key[RESTCONF_VERSION] in res.keys():
connection_status = res[return_key[RESTCONF_VERSION]][0]['netconf-node-topology:connection-status']
def check_node_request(node: str):
# pylint: disable=line-too-long
url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device?content=config', # nopep8
- 'lighty': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device?content=config', # nopep8
'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device'} # nopep8
response = get_request(url[RESTCONF_VERSION].format('{}', node))
res = response.json()
return_key = {'rfc8040': 'org-openroadm-device:org-openroadm-device',
- 'lighty': 'org-openroadm-device:org-openroadm-device',
'draft-bierman02': 'org-openroadm-device'}
if return_key[RESTCONF_VERSION] in res.keys():
response_attribute = res[return_key[RESTCONF_VERSION]]
def check_node_attribute_request(node: str, attribute: str, attribute_value: str):
# pylint: disable=line-too-long
url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}?content=nonconfig', # nopep8
- 'lighty': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}?content=nonconfig', # nopep8
'draft-bierman02': '{}/operational/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}'} # nopep8
response = get_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value))
res = response.json()
return_key = {'rfc8040': 'org-openroadm-device:' + attribute,
- 'lighty': 'org-openroadm-device:' + attribute,
'draft-bierman02': attribute}
if return_key[RESTCONF_VERSION] in res.keys():
response_attribute = res[return_key[RESTCONF_VERSION]]
def check_node_attribute2_request(node: str, attribute: str, attribute_value: str, attribute2: str):
# pylint: disable=line-too-long
url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}/{}?content=config', # nopep8
- 'lighty': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}/{}?content=config', # nopep8
'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}/{}'} # nopep8
response = get_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value, attribute2))
res = response.json()
def del_node_attribute_request(node: str, attribute: str, attribute_value: str):
# pylint: disable=line-too-long
url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}', # nopep8
- 'lighty': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}', # nopep8
'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}'} # nopep8
response = delete_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value))
return response
def post_portmapping(payload: str):
url = {'rfc8040': '{}/data/transportpce-portmapping:network',
- 'lighty': '{}/data/transportpce-portmapping:network',
'draft-bierman02': '{}/config/transportpce-portmapping:network'}
json_payload = json.loads(payload)
response = post_request(url[RESTCONF_VERSION].format('{}'), json_payload)
def del_portmapping():
url = {'rfc8040': '{}/data/transportpce-portmapping:network',
- 'lighty': '{}/data/transportpce-portmapping:network',
'draft-bierman02': '{}/config/transportpce-portmapping:network'}
response = delete_request(url[RESTCONF_VERSION].format('{}'))
return {'status_code': response.status_code}
def get_portmapping_node_attr(node: str, attr: str, value: str):
# pylint: disable=consider-using-f-string
url = {'rfc8040': '{}/data/transportpce-portmapping:network/nodes={}',
- 'lighty': '{}/data/transportpce-portmapping:network/nodes={}',
'draft-bierman02': '{}/config/transportpce-portmapping:network/nodes/{}'}
target_url = url[RESTCONF_VERSION].format('{}', node)
if attr is not None:
target_url = (target_url + '/{}').format('{}', attr)
if value is not None:
- suffix = {'rfc8040': '={}', 'lighty': '={}', 'draft-bierman02': '/{}'}
+ suffix = {'rfc8040': '={}', 'draft-bierman02': '/{}'}
target_url = (target_url + suffix[RESTCONF_VERSION]).format('{}', value)
else:
attr = 'nodes'
response = get_request(target_url)
res = response.json()
return_key = {'rfc8040': 'transportpce-portmapping:' + attr,
- 'lighty': 'transportpce-portmapping:' + attr,
'draft-bierman02': attr}
if return_key[RESTCONF_VERSION] in res.keys():
return_output = res[return_key[RESTCONF_VERSION]]
def get_ietf_network_request(network: str, content: str):
url = {'rfc8040': '{}/data/ietf-network:networks/network={}?content={}',
- 'lighty': '{}/data/ietf-network:networks/network={}?content={}',
'draft-bierman02': '{}/{}/ietf-network:networks/network/{}'}
- if RESTCONF_VERSION in ('rfc8040', 'lighty'):
+ if RESTCONF_VERSION in ('rfc8040'):
format_args = ('{}', network, content)
elif content == 'config':
format_args = ('{}', content, network)
if bool(response):
res = response.json()
return_key = {'rfc8040': 'ietf-network:network',
- 'lighty': 'ietf-network:network',
'draft-bierman02': 'network'}
networks = res[return_key[RESTCONF_VERSION]]
else:
def put_ietf_network(network: str, payload: str):
url = {'rfc8040': '{}/data/ietf-network:networks/network={}',
- 'lighty': '{}/data/ietf-network:networks/network={}',
'draft-bierman02': '{}/config/ietf-network:networks/network/{}'}
json_payload = json.loads(payload)
response = put_request(url[RESTCONF_VERSION].format('{}', network), json_payload)
def del_ietf_network(network: str):
url = {'rfc8040': '{}/data/ietf-network:networks/network={}',
- 'lighty': '{}/data/ietf-network:networks/network={}',
'draft-bierman02': '{}/config/ietf-network:networks/network/{}'}
response = delete_request(url[RESTCONF_VERSION].format('{}', network))
return {'status_code': response.status_code}
def get_ietf_network_link_request(network: str, link: str, content: str):
url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}?content={}',
- 'lighty': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}?content={}',
'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
- if RESTCONF_VERSION in ('rfc8040', 'lighty'):
+ if RESTCONF_VERSION in ('rfc8040'):
format_args = ('{}', network, link, content)
elif content == 'config':
format_args = ('{}', content, network, link)
response = get_request(url[RESTCONF_VERSION].format(*format_args))
res = response.json()
return_key = {'rfc8040': 'ietf-network-topology:link',
- 'lighty': 'ietf-network-topology:link',
'draft-bierman02': 'ietf-network-topology:link'}
link = res[return_key[RESTCONF_VERSION]][0]
return {'status_code': response.status_code,
def del_ietf_network_link_request(network: str, link: str, content: str):
url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}?content={}',
- 'lighty': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}?content={}',
'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
- if RESTCONF_VERSION in ('rfc8040', 'lighty'):
+ if RESTCONF_VERSION in ('rfc8040'):
format_args = ('{}', network, link, content)
elif content == 'config':
format_args = ('{}', content, network, link)
def add_oms_attr_request(link: str, oms_attr: str):
url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}',
- 'lighty': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}',
'draft-bierman02': '{}/config/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
url2 = url[RESTCONF_VERSION] + '/org-openroadm-network-topology:OMS-attributes/span'
network = 'openroadm-topology'
def del_oms_attr_request(link: str,):
url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}',
- 'lighty': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}',
'draft-bierman02': '{}/config/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
url2 = url[RESTCONF_VERSION] + '/org-openroadm-network-topology:OMS-attributes/span'
network = 'openroadm-topology'
def get_ietf_network_node_request(network: str, node: str, content: str):
url = {'rfc8040': '{}/data/ietf-network:networks/network={}/node={}?content={}',
- 'lighty': '{}/data/ietf-network:networks/network={}/node={}?content={}',
'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/node/{}'}
- if RESTCONF_VERSION in ('rfc8040', 'lighty'):
+ if RESTCONF_VERSION in ('rfc8040'):
format_args = ('{}', network, node, content)
elif content == 'config':
format_args = ('{}', content, network, node)
if bool(response):
res = response.json()
return_key = {'rfc8040': 'ietf-network:node',
- 'lighty': 'ietf-network:node',
'draft-bierman02': 'node'}
node = res[return_key[RESTCONF_VERSION]][0]
else:
def del_ietf_network_node_request(network: str, node: str, content: str):
url = {'rfc8040': '{}/data/ietf-network:networks/network={}/node={}?content={}',
- 'lighty': '{}/data/ietf-network:networks/network={}/node={}?content={}',
'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/node/{}'}
- if RESTCONF_VERSION in ('rfc8040', 'lighty'):
+ if RESTCONF_VERSION in ('rfc8040'):
format_args = ('{}', network, node, content)
elif content == 'config':
format_args = ('{}', content, network, node)
def get_ordm_serv_list_request():
url = {'rfc8040': '{}/data/org-openroadm-service:service-list?content=nonconfig',
- 'lighty': '{}/data/org-openroadm-service:service-list?content=nonconfig',
'draft-bierman02': '{}/operational/org-openroadm-service:service-list/'}
response = get_request(url[RESTCONF_VERSION])
res = response.json()
return_key = {'rfc8040': 'org-openroadm-service:service-list',
- 'lighty': 'org-openroadm-service:service-list',
'draft-bierman02': 'service-list'}
if return_key[RESTCONF_VERSION] in res.keys():
response_attribute = res[return_key[RESTCONF_VERSION]]
def get_ordm_serv_list_attr_request(attribute: str, value: str):
url = {'rfc8040': '{}/data/org-openroadm-service:service-list/{}={}?content=nonconfig',
- 'lighty': '{}/data/org-openroadm-service:service-list/{}={}?content=nonconfig',
'draft-bierman02': '{}/operational/org-openroadm-service:service-list/{}/{}'}
format_args = ('{}', attribute, value)
response = get_request(url[RESTCONF_VERSION].format(*format_args))
res = response.json()
return_key = {'rfc8040': 'org-openroadm-service:' + attribute,
- 'lighty': 'org-openroadm-service:' + attribute,
'draft-bierman02': attribute}
if return_key[RESTCONF_VERSION] in res.keys():
response_attribute = res[return_key[RESTCONF_VERSION]]
def get_serv_path_list_attr(attribute: str, value: str):
url = {'rfc8040': '{}/data/transportpce-service-path:service-path-list/{}={}?content=nonconfig',
- 'lighty': '{}/data/transportpce-service-path:service-path-list/{}={}?content=nonconfig',
'draft-bierman02': '{}/operational/transportpce-service-path:service-path-list/{}/{}'}
response = get_request(url[RESTCONF_VERSION].format('{}', attribute, value))
res = response.json()
return_key = {'rfc8040': 'transportpce-service-path:' + attribute,
- 'lighty': 'transportpce-service-path:' + attribute,
'draft-bierman02': attribute}
if return_key[RESTCONF_VERSION] in res.keys():
response_attribute = res[return_key[RESTCONF_VERSION]]
else:
res = response.json()
return_key = {'rfc8040': api_module + ':output',
- 'lighty': api_module + ':output',
'draft-bierman02': 'output'}
if response.status_code == requests.codes.internal_server_error:
return_output = res