global _cache
-def get(url, userId='admin', password='admin'):
+def get(url, userId="admin", password="admin"):
"""Helps in making GET REST calls"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
- DeprecationWarning
+ DeprecationWarning,
)
headers = {}
- headers['Accept'] = 'application/xml'
+ headers["Accept"] = "application/xml"
# Send the GET request
session = _cache.switch("CLUSTERING_GET")
"""Helps in making POST REST calls without outputs"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
- DeprecationWarning
+ DeprecationWarning,
)
if userId is None:
- userId = 'admin'
+ userId = "admin"
if password is None:
- password = 'admin'
+ password = "admin"
headers = {}
- headers['Content-Type'] = 'application/json'
+ headers["Content-Type"] = "application/json"
# headers['Accept']= 'application/xml'
session = _cache.switch("CLUSTERING_POST")
- resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
+ resp = session.post(
+ url, data.encode("utf-8"), headers=headers, auth=(userId, password)
+ )
return resp
"""Helps in making POST REST calls"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
- DeprecationWarning
+ DeprecationWarning,
)
if userId is None:
- userId = 'admin'
+ userId = "admin"
if password is None:
- password = 'admin'
+ password = "admin"
print("post request with url " + url)
print("post request with data " + data)
headers = {}
- headers['Content-Type'] = 'application/json'
+ headers["Content-Type"] = "application/json"
# headers['Accept'] = 'application/xml'
session = _cache.switch("CLUSTERING_POST")
- resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
+ resp = session.post(
+ url, data.encode("utf-8"), headers=headers, auth=(userId, password)
+ )
# print(resp.raise_for_status())
print(resp.headers)
return resp
-def delete(url, userId='admin', password='admin'):
+def delete(url, userId="admin", password="admin"):
"""Helps in making DELET REST calls"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
- DeprecationWarning
+ DeprecationWarning,
)
print("delete all resources belonging to url" + url)
session = _cache.switch("CLUSTERING_DELETE")
def Should_Not_Be_Type_None(var):
- '''Keyword to check if the given variable is of type NoneType. If the
- variable type does match raise an assertion so the keyword will fail
- '''
+ """Keyword to check if the given variable is of type NoneType. If the
+ variable type does match raise an assertion so the keyword will fail
+ """
if var is None:
- raise AssertionError('the variable passed was type NoneType')
- return 'PASS'
+ raise AssertionError("the variable passed was type NoneType")
+ return "PASS"
def execute_ssh_command(ip, username, password, command):
def wait_for_controller_up(ip, port="8181"):
- url = "http://" + ip + ":" + str(port) + \
- "/restconf/config/opendaylight-inventory:nodes/node/controller-config/yang-ext:mount/config:modules"
+ url = (
+ "http://"
+ + ip
+ + ":"
+ + str(port)
+ + "/restconf/config/opendaylight-inventory:nodes/node/controller-config/yang-ext:mount/config:modules"
+ )
print("Waiting for controller " + ip + " up.")
# Try 30*10s=5 minutes for the controller to be up.
resp = get(url, "admin", "admin")
print("attempt %s response is %s" % (str(i), str(resp)))
print(resp.text)
- if ('clustering-it-provider' in resp.text):
+ if "clustering-it-provider" in resp.text:
print("Wait for controller " + ip + " succeeded")
return True
except Exception as e:
i = 1
while i <= tries:
stdout = lib.execute_command("ps -axf | grep karaf | grep -v grep | wc -l")
- processCnt = stdout[0].strip('\n')
+ processCnt = stdout[0].strip("\n")
print("processCnt: " + processCnt)
- if processCnt == '0':
+ if processCnt == "0":
break
i = i + 1
time.sleep(3)
def clean_journal(ip, username, password, karafHome):
- execute_ssh_command(ip, username, password, "rm -rf " + karafHome + "/journal")
+ execute_ssh_command(ip, username, password, "rm -rf " + karafHome + "/*journal")
def kill_controller(ip, username, password, karafHome):
- execute_ssh_command(ip, username, password,
- "ps axf | grep karaf | grep -v grep | awk '{print \"kill -9 \" $1}' | sh")
+ execute_ssh_command(
+ ip,
+ username,
+ password,
+ "ps axf | grep karaf | grep -v grep | awk '{print \"kill -9 \" $1}' | sh",
+ )
def isolate_controller(controllers, username, password, isolated):
- """ Isolate one controller from the others in the cluster
+ """Isolate one controller from the others in the cluster
:param controllers: A list of ip addresses or host names as strings.
:param username: Username for the controller to be isolated.
isolated_controller = controllers[isolated - 1]
for controller in controllers:
if controller != isolated_controller:
- base_str = 'sudo iptables -I OUTPUT -p all --source '
- cmd_str = base_str + isolated_controller + ' --destination ' + controller + ' -j DROP'
+ base_str = "sudo iptables -I OUTPUT -p all --source "
+ cmd_str = (
+ base_str
+ + isolated_controller
+ + " --destination "
+ + controller
+ + " -j DROP"
+ )
execute_ssh_command(isolated_controller, username, password, cmd_str)
- cmd_str = base_str + controller + ' --destination ' + isolated_controller + ' -j DROP'
+ cmd_str = (
+ base_str
+ + controller
+ + " --destination "
+ + isolated_controller
+ + " -j DROP"
+ )
execute_ssh_command(isolated_controller, username, password, cmd_str)
- ip_tables = execute_ssh_command(isolated_controller, username, password, 'sudo iptables -L')
+ ip_tables = execute_ssh_command(
+ isolated_controller, username, password, "sudo iptables -L"
+ )
print(ip_tables)
- iso_result = 'pass'
+ iso_result = "pass"
for controller in controllers:
- controller_regex_string = "[\s\S]*" + isolated_controller + " *" + controller + "[\s\S]*"
+ controller_regex_string = (
+ "[\s\S]*" + isolated_controller + " *" + controller + "[\s\S]*"
+ )
controller_regex = re.compile(controller_regex_string)
if not controller_regex.match(ip_tables):
iso_result = ip_tables
- controller_regex_string = "[\s\S]*" + controller + " *" + isolated_controller + "[\s\S]*"
+ controller_regex_string = (
+ "[\s\S]*" + controller + " *" + isolated_controller + "[\s\S]*"
+ )
controller_regex = re.compile(controller_regex_string)
if not controller_regex.match(ip_tables):
iso_result = ip_tables
def rejoin_controller(controllers, username, password, isolated):
- """ Return an isolated controller to the cluster.
+ """Return an isolated controller to the cluster.
:param controllers: A list of ip addresses or host names as strings.
:param username: Username for the isolated controller.
isolated_controller = controllers[isolated - 1]
for controller in controllers:
if controller != isolated_controller:
- base_str = 'sudo iptables -D OUTPUT -p all --source '
- cmd_str = base_str + isolated_controller + ' --destination ' + controller + ' -j DROP'
+ base_str = "sudo iptables -D OUTPUT -p all --source "
+ cmd_str = (
+ base_str
+ + isolated_controller
+ + " --destination "
+ + controller
+ + " -j DROP"
+ )
execute_ssh_command(isolated_controller, username, password, cmd_str)
- cmd_str = base_str + controller + ' --destination ' + isolated_controller + ' -j DROP'
+ cmd_str = (
+ base_str
+ + controller
+ + " --destination "
+ + isolated_controller
+ + " -j DROP"
+ )
execute_ssh_command(isolated_controller, username, password, cmd_str)
- ip_tables = execute_ssh_command(isolated_controller, username, password, 'sudo iptables -L')
+ ip_tables = execute_ssh_command(
+ isolated_controller, username, password, "sudo iptables -L"
+ )
print(ip_tables)
- iso_result = 'pass'
+ iso_result = "pass"
for controller in controllers:
- controller_regex_string = "[\s\S]*" + isolated_controller + " *" + controller + "[\s\S]*"
+ controller_regex_string = (
+ "[\s\S]*" + isolated_controller + " *" + controller + "[\s\S]*"
+ )
controller_regex = re.compile(controller_regex_string)
if controller_regex.match(ip_tables):
iso_result = ip_tables
- controller_regex_string = "[\s\S]*" + controller + " *" + isolated_controller + "[\s\S]*"
+ controller_regex_string = (
+ "[\s\S]*" + controller + " *" + isolated_controller + "[\s\S]*"
+ )
controller_regex = re.compile(controller_regex_string)
if controller_regex.match(ip_tables):
iso_result = ip_tables
:param password: Password for all controllers.
:return: If successful, returns "pass", otherwise returns "fail".
"""
- flush_result = 'pass'
+ flush_result = "pass"
for controller in controllers:
- print('Flushing ', controller)
- cmd_str = 'sudo iptables -v -F'
+ print("Flushing ", controller)
+ cmd_str = "sudo iptables -v -F"
cmd_result = execute_ssh_command(controller, username, password, cmd_str)
print(cmd_result)
success_string = "Flushing chain `INPUT'" + "\n"
def build_elastic_search_JSON_request(query_String):
- data = {'from': '0',
- 'size': '1',
- 'sort': [{'TimeStamp': {'order': 'desc'}}],
- 'query': {'query_string': {'query': query_String}}}
+ data = {
+ "from": "0",
+ "size": "1",
+ "sort": [{"TimeStamp": {"order": "desc"}}],
+ "query": {"query_string": {"query": query_String}},
+ }
return json.dumps(data)
def create_query_string_search(data_category, metric_name, node_id, rk_node_id):
- query = 'TSDRDataCategory:'
+ query = "TSDRDataCategory:"
query += data_category
- query += ' AND MetricName:'
+ query += " AND MetricName:"
query += metric_name
- query += ' AND NodeID:\"'
+ query += ' AND NodeID:"'
query += node_id
- query += '\" AND RecordKeys.KeyValue:\"'
+ query += '" AND RecordKeys.KeyValue:"'
query += rk_node_id
- query += '\" AND RecordKeys.KeyName:Node AND RecordKeys.KeyValue:0 AND RecordKeys.KeyName:Table'
+ query += '" AND RecordKeys.KeyName:Node AND RecordKeys.KeyValue:0 AND RecordKeys.KeyName:Table'
return query
def create_query_string_count(data_category):
- query = 'TSDRDataCategory:'
+ query = "TSDRDataCategory:"
query += data_category
return query
def extract_metric_value_search(response):
- return str(response['hits']['hits'][0]['_source']['MetricValue'])
+ return str(response["hits"]["hits"][0]["_source"]["MetricValue"])
def extract_metric_value_count(response):
- return int(response['hits']['total'])
+ return int(response["hits"]["total"])
#
# main invoked
if __name__ != "__main__":
- _cache = robot.utils.ConnectionCache('No sessions created')
+ _cache = robot.utils.ConnectionCache("No sessions created")
# here create one session for each HTTP functions
- _cache.register(requests.session(), alias='CLUSTERING_GET')
- _cache.register(requests.session(), alias='CLUSTERING_POST')
- _cache.register(requests.session(), alias='CLUSTERING_DELETE')
+ _cache.register(requests.session(), alias="CLUSTERING_GET")
+ _cache.register(requests.session(), alias="CLUSTERING_POST")
+ _cache.register(requests.session(), alias="CLUSTERING_DELETE")