"""Helps in making GET REST calls"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
"""Helps in making GET REST calls"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
"""Helps in making POST REST calls without outputs"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
"""Helps in making POST REST calls without outputs"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
- resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
+ resp = session.post(
+ url, data.encode("utf-8"), headers=headers, auth=(userId, password)
+ )
"""Helps in making POST REST calls"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
"""Helps in making POST REST calls"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
- resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
+ resp = session.post(
+ url, data.encode("utf-8"), headers=headers, auth=(userId, password)
+ )
"""Helps in making DELET REST calls"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
"""Helps in making DELET REST calls"""
warnings.warn(
"Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
- '''Keyword to check if the given variable is of type NoneType. If the
- variable type does match raise an assertion so the keyword will fail
- '''
+ """Keyword to check if the given variable is of type NoneType. If the
+ variable type does match raise an assertion so the keyword will fail
+ """
def execute_ssh_command(ip, username, password, command):
def execute_ssh_command(ip, username, password, command):
use username and password of controller server for ssh and need
karaf distribution location like /root/Documents/dist
"""
use username and password of controller server for ssh and need
karaf distribution location like /root/Documents/dist
"""
- print "attempt " + str(i) + " response is " + str(resp)
- print resp.text
- if ('clustering-it-provider' in resp.text):
- print "Wait for controller " + ip + " succeeded"
+ print("attempt %s response is %s" % (str(i), str(resp)))
+ print(resp.text)
+ if "clustering-it-provider" in resp.text:
+ print("Wait for controller " + ip + " succeeded")
kill_controller(ip, username, password, karafHome)
def clean_journal(ip, username, password, karafHome):
kill_controller(ip, username, password, karafHome)
def clean_journal(ip, username, password, karafHome):
- execute_ssh_command(ip, username, password, "rm -rf " + karafHome + "/journal")
+ execute_ssh_command(ip, username, password, "rm -rf " + karafHome + "/*journal")
def kill_controller(ip, username, password, karafHome):
def kill_controller(ip, username, password, karafHome):
- execute_ssh_command(ip, username, password,
- "ps axf | grep karaf | grep -v grep | awk '{print \"kill -9 \" $1}' | sh")
+ execute_ssh_command(
+ ip,
+ username,
+ password,
+ "ps axf | grep karaf | grep -v grep | awk '{print \"kill -9 \" $1}' | sh",
+ )
def isolate_controller(controllers, username, password, isolated):
def isolate_controller(controllers, username, password, isolated):
:param controllers: A list of ip addresses or host names as strings.
:param username: Username for the controller to be isolated.
:param controllers: A list of ip addresses or host names as strings.
:param username: Username for the controller to be isolated.
isolated_controller = controllers[isolated - 1]
for controller in controllers:
if controller != isolated_controller:
isolated_controller = controllers[isolated - 1]
for controller in controllers:
if controller != isolated_controller:
execute_ssh_command(isolated_controller, username, password, cmd_str)
execute_ssh_command(isolated_controller, username, password, cmd_str)
execute_ssh_command(isolated_controller, username, password, cmd_str)
execute_ssh_command(isolated_controller, username, password, cmd_str)
- ip_tables = execute_ssh_command(isolated_controller, username, password, 'sudo iptables -L')
- print ip_tables
- iso_result = 'pass'
+ ip_tables = execute_ssh_command(
+ isolated_controller, username, password, "sudo iptables -L"
+ )
+ print(ip_tables)
+ iso_result = "pass"
def rejoin_controller(controllers, username, password, isolated):
def rejoin_controller(controllers, username, password, isolated):
:param controllers: A list of ip addresses or host names as strings.
:param username: Username for the isolated controller.
:param controllers: A list of ip addresses or host names as strings.
:param username: Username for the isolated controller.
isolated_controller = controllers[isolated - 1]
for controller in controllers:
if controller != isolated_controller:
isolated_controller = controllers[isolated - 1]
for controller in controllers:
if controller != isolated_controller:
execute_ssh_command(isolated_controller, username, password, cmd_str)
execute_ssh_command(isolated_controller, username, password, cmd_str)
execute_ssh_command(isolated_controller, username, password, cmd_str)
execute_ssh_command(isolated_controller, username, password, cmd_str)
- ip_tables = execute_ssh_command(isolated_controller, username, password, 'sudo iptables -L')
- print ip_tables
- iso_result = 'pass'
+ ip_tables = execute_ssh_command(
+ isolated_controller, username, password, "sudo iptables -L"
+ )
+ print(ip_tables)
+ iso_result = "pass"
:param password: Password for all controllers.
:return: If successful, returns "pass", otherwise returns "fail".
"""
:param password: Password for all controllers.
:return: If successful, returns "pass", otherwise returns "fail".
"""
- print 'Flushing ' + controller
- cmd_str = 'sudo iptables -v -F'
+ print("Flushing ", controller)
+ cmd_str = "sudo iptables -v -F"
cmd_result = execute_ssh_command(controller, username, password, cmd_str)
cmd_result = execute_ssh_command(controller, username, password, cmd_str)
success_string = "Flushing chain `INPUT'" + "\n"
success_string += "Flushing chain `FORWARD'" + "\n"
success_string += "Flushing chain `OUTPUT'"
if not cmd_result == success_string:
flush_result = "Failed to flush IPTables. Check Log."
success_string = "Flushing chain `INPUT'" + "\n"
success_string += "Flushing chain `FORWARD'" + "\n"
success_string += "Flushing chain `OUTPUT'"
if not cmd_result == success_string:
flush_result = "Failed to flush IPTables. Check Log."
- data = {'from': '0',
- 'size': '1',
- 'sort': [{'TimeStamp': {'order': 'desc'}}],
- 'query': {'query_string': {'query': query_String}}}
+ data = {
+ "from": "0",
+ "size": "1",
+ "sort": [{"TimeStamp": {"order": "desc"}}],
+ "query": {"query_string": {"query": query_String}},
+ }
return json.dumps(data)
def create_query_string_search(data_category, metric_name, node_id, rk_node_id):
return json.dumps(data)
def create_query_string_search(data_category, metric_name, node_id, rk_node_id):
- _cache.register(requests.session(), alias='CLUSTERING_GET')
- _cache.register(requests.session(), alias='CLUSTERING_POST')
- _cache.register(requests.session(), alias='CLUSTERING_DELETE')
+ _cache.register(requests.session(), alias="CLUSTERING_GET")
+ _cache.register(requests.session(), alias="CLUSTERING_POST")
+ _cache.register(requests.session(), alias="CLUSTERING_DELETE")