# pylint: disable=no-member
# pylint: disable=too-many-public-methods
# a pylint false positive due to unittest
-# pylint: disable=no-self-use
import time
import unittest
TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'}
TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
+REQUEST_TIMEOUT = 10
+
CODE_SHOULD_BE_200 = 'Http status code should be 200'
CODE_SHOULD_BE_201 = 'Http status code should be 201'
return requests.request(
"GET", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def post_request(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
return requests.request(
"POST", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def post_xmlrequest(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_XML,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
return None
"PUT", url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def put_xmlrequest(url, data):
"PUT", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_XML,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def put_jsonrequest(url, data):
"PUT", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def rawput_request(url, data):
"PUT", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def rawpost_request(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=data,
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def delete_request(url):
return requests.request(
"DELETE", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def mount_device(node_id, sim):
TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'}
TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
+REQUEST_TIMEOUT = 10
+
CODE_SHOULD_BE_200 = 'Http status code should be 200'
CODE_SHOULD_BE_201 = 'Http status code should be 201'
return requests.request(
'GET', url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def put_request(url, data):
'PUT', url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def delete_request(url):
return requests.request(
'DELETE', url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
def post_request(url, data):
"POST", url.format(RESTCONF_BASE_URL),
data=json.dumps(data),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
return requests.request(
"POST", url.format(RESTCONF_BASE_URL),
headers=TYPE_APPLICATION_JSON,
- auth=(ODL_LOGIN, ODL_PWD))
+ auth=(ODL_LOGIN, ODL_PWD),
+ timeout=REQUEST_TIMEOUT)
#
# Process management
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8130/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8154/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
}
response = requests.request("POST", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.json()['output']['status-message'], "The PMs has been successfully set !")
time.sleep(self.WAITING * 2)
}
response = requests.request("POST", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
- auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD))
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.json()['output']['status-message'], "The PMs has been successfully released !")
time.sleep(2)
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
# If the gate fails is because of the waiting time not being enough
time.sleep(2)
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
+ timeout=test_utils.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
# If the gate fails is because of the waiting time not being enough
time.sleep(2)