else:
RESTCONF_VERSION = 'rfc8040'
-RESTCONF_BASE_URL = 'http://localhost:' + RESTCONF_PORT + RESTCONF_PATH_PREFIX[RESTCONF_VERSION]
+RESTCONF_BASE_URL = 'http://localhost:' + str(RESTCONF_PORT) + RESTCONF_PATH_PREFIX[RESTCONF_VERSION]
if 'USE_ODL_ALT_KARAF_INSTALL_DIR' in os.environ:
KARAF_INSTALLDIR = os.environ['USE_ODL_ALT_KARAF_INSTALL_DIR']
attribute: response_attribute}
+def check_node_attribute2_request(node: str, attribute: str, attribute_value: str, attribute2: str):
+ # pylint: disable=line-too-long
+ url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}/{}?content=config', # nopep8
+ 'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}/{}'} # nopep8
+ response = get_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value, attribute2))
+ res = response.json()
+ if attribute2 in res.keys():
+ response_attribute = res[attribute2]
+ else:
+ response_attribute = res['errors']['error'][0]
+ return {'status_code': response.status_code,
+ attribute2: response_attribute}
+
+
def del_node_attribute_request(node: str, attribute: str, attribute_value: str):
# pylint: disable=line-too-long
url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}', # nopep8
'mapping': mapping}
+def portmapping_switching_pool_request(node: str, switching_pool: str):
+ url = {'rfc8040': '{}/data/transportpce-portmapping:network/nodes={}/switching-pool-lcp={}',
+ 'draft-bierman02': '{}/config/transportpce-portmapping:network/nodes/{}/switching-pool-lcp/{}'}
+ response = get_request(url[RESTCONF_VERSION].format('{}', node, switching_pool))
+ res = response.json()
+ return_key = {'rfc8040': 'transportpce-portmapping:switching-pool-lcp',
+ 'draft-bierman02': 'switching-pool-lcp'}
+ switching_pool = res[return_key[RESTCONF_VERSION]]
+ return {'status_code': response.status_code,
+ 'switching_pool': switching_pool}
+
+
def portmapping_mc_capa_request(node: str, mc_capa: str):
url = {'rfc8040': '{}/data/transportpce-portmapping:network/nodes={}/mc-capabilities={}',
'draft-bierman02': '{}/config/transportpce-portmapping:network/nodes/{}/mc-capabilities/{}'}
return response
#
-# TransportPCE network-utils and service-path operations
+# TransportPCE internal API RPCs
#
return post_request(url, data)
-def device_renderer_service_path_request(payload: dict):
- url = "{}/operations/transportpce-device-renderer:service-path"
+def device_renderer_create_ots_oms_request(nodeid: str, lcp: str):
+ url = "{}/operations/transportpce-device-renderer:create-ots-oms"
+ payload = {
+ 'node-id': nodeid,
+ 'logical-connection-point': lcp}
if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys(payload, 'transportpce-device-renderer:')
+ data = prepend_dict_keys({'input': payload}, 'transportpce-device-renderer:')
else:
- data = payload
+ data = {'input': payload}
return post_request(url, data)
-def device_renderer_otn_service_path_request(payload: dict):
- url = "{}/operations/transportpce-device-renderer:otn-service-path"
- if RESTCONF_VERSION == 'draft-bierman02':
- data = prepend_dict_keys(payload, 'transportpce-device-renderer:')
+def transportpce_api_rpc_request(api_module: str, rpc: str, payload: dict):
+ # pylint: disable=consider-using-f-string
+ url = "{}/operations/{}:{}".format('{}', api_module, rpc)
+ if payload is None:
+ data = None
+ elif RESTCONF_VERSION == 'draft-bierman02':
+ data = prepend_dict_keys({'input': payload}, api_module + ':')
else:
- data = payload
+ data = {'input': payload}
response = post_request(url, data)
res = response.json()
- return_key = {'rfc8040': 'transportpce-device-renderer:output',
+ return_key = {'rfc8040': api_module + ':output',
'draft-bierman02': 'output'}
return_output = res[return_key[RESTCONF_VERSION]]
return {'status_code': response.status_code,