URL_OTN_SERVICE_PATH = "{}/operations/transportpce-device-renderer:otn-service-path"
URL_TAPI_CREATE_CONNECTIVITY = "{}/operations/tapi-connectivity:create-connectivity-service"
URL_TAPI_DELETE_CONNECTIVITY = "{}/operations/tapi-connectivity:delete-connectivity-service"
+URL_TAPI_GET_CONNECTIVITY = "{}/operations/tapi-connectivity:get-connectivity-service-details"
URL_CREATE_OTS_OMS = "{}/operations/transportpce-device-renderer:create-ots-oms"
URL_PATH_COMPUTATION_REQUEST = "{}/operations/transportpce-pce:path-computation-request"
URL_FULL_PORTMAPPING = "{}/config/transportpce-portmapping:network"
URL_TAPI_TOPOLOGY_DETAILS = "{}/operations/tapi-topology:get-topology-details"
URL_TAPI_NODE_DETAILS = "{}/operations/tapi-topology:get-node-details"
+URL_TAPI_NEP_DETAILS = "{}/operations/tapi-topology:get-node-edge-point-details"
URL_TAPI_SIP_LIST = "{}/operations/tapi-common:get-service-interface-point-list"
URL_TAPI_SERVICE_LIST = "{}/operations/tapi-connectivity:get-connectivity-service-list"
+URL_TAPI_NOTIFICATION_SUBS_SERVICE = "{}/operations/tapi-notification:create-notification-subscription-service"
+URL_TAPI_GET_NOTIFICATION_LIST = "{}/operations/tapi-notification:get-notification-list"
TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'}
TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
+REQUEST_TIMEOUT = 10
+
CODE_SHOULD_BE_200 = 'Http status code should be 200'
CODE_SHOULD_BE_201 = 'Http status code should be 201'
return requests.request(
"GET", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def post_request(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
return requests.request(
"POST", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def post_xmlrequest(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_XML,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
return None
"PUT", url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def put_xmlrequest(url, data):
"PUT", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_XML,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def put_jsonrequest(url, data):
"PUT", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def rawput_request(url, data):
"PUT", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def rawpost_request(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def delete_request(url):
return requests.request(
"DELETE", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def mount_device(node_id, sim):
return post_request(URL_TAPI_CREATE_CONNECTIVITY, topologyidorname)
+def tapi_get_connectivity_request(serviceidorname):
+ attr = {
+ "input": {
+ "service-id-or-name": serviceidorname}}
+ return post_request(URL_TAPI_GET_CONNECTIVITY, attr)
+
+
def tapi_delete_connectivity_request(serviceidorname):
attr = {
"input": {
return post_request(URL_TAPI_NODE_DETAILS, attr)
+def tapi_get_node_edge_point_details_request(topologyidorname, nodeidorname, nepidorname):
+ attr = {
+ "input": {
+ "topology-id-or-name": topologyidorname,
+ "node-id-or-name": nodeidorname,
+ "ep-id-or-name": nepidorname}}
+ return post_request(URL_TAPI_NEP_DETAILS, attr)
+
+
def tapi_get_sip_details_request():
return post_request(URL_TAPI_SIP_LIST, "")
return post_request(URL_TAPI_SERVICE_LIST, "")
+def tapi_create_notification_subscription_service_request(attr):
+ return post_request(URL_TAPI_NOTIFICATION_SUBS_SERVICE, attr)
+
+
+def tapi_get_notifications_list_request(attr):
+ return post_request(URL_TAPI_GET_NOTIFICATION_LIST, attr)
+
+
def shutdown_process(process):
if process is not None:
for child in psutil.Process(process.pid).children():