TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'}
TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
+REQUEST_TIMEOUT = 10
+
CODE_SHOULD_BE_200 = 'Http status code should be 200'
CODE_SHOULD_BE_201 = 'Http status code should be 201'
return requests.request(
'GET', url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def put_request(url, data):
'PUT', url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def delete_request(url):
return requests.request(
'DELETE', url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def post_request(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
return requests.request(
"POST", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
#
# Process management
attribute: response_attribute}
+def get_serv_path_list_attr(attribute: str, value: str):
+ url = {'rfc8040': '{}/data/transportpce-service-path:service-path-list/{}={}?content=nonconfig',
+ 'draft-bierman02': '{}/operational/transportpce-service-path:service-path-list/{}/{}'}
+ response = get_request(url[RESTCONF_VERSION].format('{}', attribute, value))
+ res = response.json()
+ return_key = {'rfc8040': 'transportpce-service-path:' + attribute,
+ 'draft-bierman02': attribute}
+ if return_key[RESTCONF_VERSION] in res.keys():
+ response_attribute = res[return_key[RESTCONF_VERSION]]
+ else:
+ response_attribute = res['errors']['error'][0]
+ return {'status_code': response.status_code,
+ attribute: response_attribute}
+
+
#
# TransportPCE internal API RPCs
#
else:
data = {'input': payload}
response = post_request(url, data)
- res = response.json()
- return_key = {'rfc8040': api_module + ':output',
- 'draft-bierman02': 'output'}
- return_output = res[return_key[RESTCONF_VERSION]]
+ if response.status_code == requests.codes.no_content:
+ return_output = None
+ else:
+ res = response.json()
+ return_key = {'rfc8040': api_module + ':output',
+ 'draft-bierman02': 'output'}
+ return_output = res[return_key[RESTCONF_VERSION]]
return {'status_code': response.status_code,
'output': return_output}