URL_CONFIG_ORDM_NET = "{}/config/ietf-network:networks/network/openroadm-network/"
URL_PORTMAPPING = "{}/config/transportpce-portmapping:network/nodes/"
URL_OPER_SERV_LIST = "{}/operational/org-openroadm-service:service-list/"
+URL_OPER_SERV_PATH_LIST = "{}/operational/transportpce-service-path:service-path-list/"
URL_GET_NBINOTIFICATIONS_PROCESS_SERV = "{}/operations/nbi-notifications:get-notifications-process-service/"
URL_GET_NBINOTIFICATIONS_ALARM_SERV = "{}/operations/nbi-notifications:get-notifications-alarm-service/"
URL_SERV_CREATE = "{}/operations/org-openroadm-service:service-create"
URL_OTN_SERVICE_PATH = "{}/operations/transportpce-device-renderer:otn-service-path"
URL_TAPI_CREATE_CONNECTIVITY = "{}/operations/tapi-connectivity:create-connectivity-service"
URL_TAPI_DELETE_CONNECTIVITY = "{}/operations/tapi-connectivity:delete-connectivity-service"
+URL_TAPI_GET_CONNECTIVITY = "{}/operations/tapi-connectivity:get-connectivity-service-details"
URL_CREATE_OTS_OMS = "{}/operations/transportpce-device-renderer:create-ots-oms"
URL_PATH_COMPUTATION_REQUEST = "{}/operations/transportpce-pce:path-computation-request"
URL_FULL_PORTMAPPING = "{}/config/transportpce-portmapping:network"
URL_TAPI_TOPOLOGY_DETAILS = "{}/operations/tapi-topology:get-topology-details"
URL_TAPI_NODE_DETAILS = "{}/operations/tapi-topology:get-node-details"
+URL_TAPI_NEP_DETAILS = "{}/operations/tapi-topology:get-node-edge-point-details"
URL_TAPI_SIP_LIST = "{}/operations/tapi-common:get-service-interface-point-list"
URL_TAPI_SERVICE_LIST = "{}/operations/tapi-connectivity:get-connectivity-service-list"
+URL_TAPI_NOTIFICATION_SUBS_SERVICE = "{}/operations/tapi-notification:create-notification-subscription-service"
+URL_TAPI_GET_NOTIFICATION_LIST = "{}/operations/tapi-notification:get-notification-list"
TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'}
TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
+REQUEST_TIMEOUT = 10
+
CODE_SHOULD_BE_200 = 'Http status code should be 200'
CODE_SHOULD_BE_201 = 'Http status code should be 201'
return requests.request(
"GET", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def post_request(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
return requests.request(
"POST", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def post_xmlrequest(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_XML,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
return None
"PUT", url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def put_xmlrequest(url, data):
"PUT", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_XML,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def put_jsonrequest(url, data):
"PUT", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def rawput_request(url, data):
"PUT", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def rawpost_request(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def delete_request(url):
return requests.request(
"DELETE", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def mount_device(node_id, sim):
def get_service_list_request(suffix: str):
- url = URL_OPER_SERV_LIST + suffix
- return get_request(url)
+ return get_request(URL_OPER_SERV_LIST + suffix)
+
+
+def get_service_path_list_request(suffix: str):
+ return get_request(URL_OPER_SERV_PATH_LIST + suffix)
def service_create_request(attr):
return post_request(URL_TAPI_CREATE_CONNECTIVITY, topologyidorname)
+def tapi_get_connectivity_request(serviceidorname):
+ attr = {
+ "input": {
+ "service-id-or-name": serviceidorname}}
+ return post_request(URL_TAPI_GET_CONNECTIVITY, attr)
+
+
def tapi_delete_connectivity_request(serviceidorname):
attr = {
"input": {
return post_request(URL_TAPI_NODE_DETAILS, attr)
+def tapi_get_node_edge_point_details_request(topologyidorname, nodeidorname, nepidorname):
+ attr = {
+ "input": {
+ "topology-id-or-name": topologyidorname,
+ "node-id-or-name": nodeidorname,
+ "ep-id-or-name": nepidorname}}
+ return post_request(URL_TAPI_NEP_DETAILS, attr)
+
+
def tapi_get_sip_details_request():
return post_request(URL_TAPI_SIP_LIST, "")
return post_request(URL_TAPI_SERVICE_LIST, "")
+def tapi_create_notification_subscription_service_request(attr):
+ return post_request(URL_TAPI_NOTIFICATION_SUBS_SERVICE, attr)
+
+
+def tapi_get_notifications_list_request(attr):
+ return post_request(URL_TAPI_GET_NOTIFICATION_LIST, attr)
+
+
def shutdown_process(process):
if process is not None:
for child in psutil.Process(process.pid).children():