Fix robot file format issues
[integration/test.git] / csit / libraries / UtilLibrary.py
index e244c334956332de5fd624dc749c52ffe16e8654..6f65fd2061595c2be806a6bebeacd36dce56d465 100644 (file)
@@ -17,14 +17,14 @@ __email__ = "syedbahm@cisco.com"
 global _cache
 
 
-def get(url, userId='admin', password='admin'):
+def get(url, userId="admin", password="admin"):
     """Helps in making GET REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
     headers = {}
-    headers['Accept'] = 'application/xml'
+    headers["Accept"] = "application/xml"
 
     # Send the GET request
     session = _cache.switch("CLUSTERING_GET")
@@ -38,21 +38,23 @@ def nonprintpost(url, userId, password, data):
     """Helps in making POST REST calls without outputs"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
 
     if userId is None:
-        userId = 'admin'
+        userId = "admin"
 
     if password is None:
-        password = 'admin'
+        password = "admin"
 
     headers = {}
-    headers['Content-Type'] = 'application/json'
+    headers["Content-Type"] = "application/json"
     # headers['Accept']= 'application/xml'
 
     session = _cache.switch("CLUSTERING_POST")
-    resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
+    resp = session.post(
+        url, data.encode("utf-8"), headers=headers, auth=(userId, password)
+    )
 
     return resp
 
@@ -61,22 +63,24 @@ def post(url, userId, password, data):
     """Helps in making POST REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
 
     if userId is None:
-        userId = 'admin'
+        userId = "admin"
 
     if password is None:
-        password = 'admin'
+        password = "admin"
 
     print("post request with url " + url)
     print("post request with data " + data)
     headers = {}
-    headers['Content-Type'] = 'application/json'
+    headers["Content-Type"] = "application/json"
     # headers['Accept'] = 'application/xml'
     session = _cache.switch("CLUSTERING_POST")
-    resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
+    resp = session.post(
+        url, data.encode("utf-8"), headers=headers, auth=(userId, password)
+    )
 
     # print(resp.raise_for_status())
     print(resp.headers)
@@ -86,11 +90,11 @@ def post(url, userId, password, data):
     return resp
 
 
-def delete(url, userId='admin', password='admin'):
+def delete(url, userId="admin", password="admin"):
     """Helps in making DELET REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
     print("delete all resources belonging to url" + url)
     session = _cache.switch("CLUSTERING_DELETE")
@@ -98,12 +102,12 @@ def delete(url, userId='admin', password='admin'):
 
 
 def Should_Not_Be_Type_None(var):
-    '''Keyword to check if the given variable is of type NoneType.  If the
-        variable type does match  raise an assertion so the keyword will fail
-    '''
+    """Keyword to check if the given variable is of type NoneType.  If the
+    variable type does match  raise an assertion so the keyword will fail
+    """
     if var is None:
-        raise AssertionError('the variable passed was type NoneType')
-    return 'PASS'
+        raise AssertionError("the variable passed was type NoneType")
+    return "PASS"
 
 
 def execute_ssh_command(ip, username, password, command):
@@ -112,37 +116,42 @@ def execute_ssh_command(ip, username, password, command):
     use username and password of controller server for ssh and need
     karaf distribution location like /root/Documents/dist
     """
-    print "executing ssh command"
+    print("executing ssh command")
     lib = SSHLibrary()
     lib.open_connection(ip)
     lib.login(username=username, password=password)
-    print "login done"
+    print("login done")
     cmd_response = lib.execute_command(command)
-    print "command executed : " + command
+    print("command executed : " + command)
     lib.close_connection()
     return cmd_response
 
 
 def wait_for_controller_up(ip, port="8181"):
-    url = "http://" + ip + ":" + str(port) + \
-          "/restconf/config/opendaylight-inventory:nodes/node/controller-config/yang-ext:mount/config:modules"
+    url = (
+        "http://"
+        + ip
+        + ":"
+        + str(port)
+        + "/restconf/config/opendaylight-inventory:nodes/node/controller-config/yang-ext:mount/config:modules"
+    )
 
-    print "Waiting for controller " + ip + " up."
+    print("Waiting for controller " + ip + " up.")
     # Try 30*10s=5 minutes for the controller to be up.
     for i in xrange(30):
         try:
-            print "attempt " + str(i) + " to url " + url
+            print("attempt %s to url %s" % (str(i), url))
             resp = get(url, "admin", "admin")
-            print "attempt " + str(i) + " response is " + str(resp)
-            print resp.text
-            if ('clustering-it-provider' in resp.text):
-                print "Wait for controller " + ip + " succeeded"
+            print("attempt %s response is %s" % (str(i), str(resp)))
+            print(resp.text)
+            if "clustering-it-provider" in resp.text:
+                print("Wait for controller " + ip + " succeeded")
                 return True
         except Exception as e:
-            print e
+            print(e)
         time.sleep(10)
 
-    print "Wait for controller " + ip + " failed"
+    print("Wait for controller " + ip + " failed")
     return False
 
 
@@ -192,10 +201,9 @@ def wait_for_controller_stopped(ip, username, password, karafHome):
     i = 1
     while i <= tries:
         stdout = lib.execute_command("ps -axf | grep karaf | grep -v grep | wc -l")
-        # print "stdout: "+stdout
-        processCnt = stdout[0].strip('\n')
+        processCnt = stdout[0].strip("\n")
         print("processCnt: " + processCnt)
-        if processCnt == '0':
+        if processCnt == "0":
             break
         i = i + 1
         time.sleep(3)
@@ -203,21 +211,25 @@ def wait_for_controller_stopped(ip, username, password, karafHome):
     lib.close_connection()
 
     if i > tries:
-        print "Killing controller"
+        print("Killing controller")
         kill_controller(ip, username, password, karafHome)
 
 
 def clean_journal(ip, username, password, karafHome):
-    execute_ssh_command(ip, username, password, "rm -rf " + karafHome + "/journal")
+    execute_ssh_command(ip, username, password, "rm -rf " + karafHome + "/*journal")
 
 
 def kill_controller(ip, username, password, karafHome):
-    execute_ssh_command(ip, username, password,
-                        "ps axf | grep karaf | grep -v grep | awk '{print \"kill -9 \" $1}' | sh")
+    execute_ssh_command(
+        ip,
+        username,
+        password,
+        "ps axf | grep karaf | grep -v grep | awk '{print \"kill -9 \" $1}' | sh",
+    )
 
 
 def isolate_controller(controllers, username, password, isolated):
-    """ Isolate one controller from the others in the cluster
+    """Isolate one controller from the others in the cluster
 
     :param controllers: A list of ip addresses or host names as strings.
     :param username: Username for the controller to be isolated.
@@ -228,20 +240,38 @@ def isolate_controller(controllers, username, password, isolated):
     isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
-            base_str = 'sudo iptables -I OUTPUT -p all --source '
-            cmd_str = base_str + isolated_controller + ' --destination ' + controller + ' -j DROP'
+            base_str = "sudo iptables -I OUTPUT -p all --source "
+            cmd_str = (
+                base_str
+                + isolated_controller
+                + " --destination "
+                + controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-            cmd_str = base_str + controller + ' --destination ' + isolated_controller + ' -j DROP'
+            cmd_str = (
+                base_str
+                + controller
+                + " --destination "
+                + isolated_controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-    ip_tables = execute_ssh_command(isolated_controller, username, password, 'sudo iptables -L')
-    print ip_tables
-    iso_result = 'pass'
+    ip_tables = execute_ssh_command(
+        isolated_controller, username, password, "sudo iptables -L"
+    )
+    print(ip_tables)
+    iso_result = "pass"
     for controller in controllers:
-        controller_regex_string = "[\s\S]*" + isolated_controller + " *" + controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + isolated_controller + " *" + controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if not controller_regex.match(ip_tables):
             iso_result = ip_tables
-        controller_regex_string = "[\s\S]*" + controller + " *" + isolated_controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + controller + " *" + isolated_controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if not controller_regex.match(ip_tables):
             iso_result = ip_tables
@@ -249,7 +279,7 @@ def isolate_controller(controllers, username, password, isolated):
 
 
 def rejoin_controller(controllers, username, password, isolated):
-    """ Return an isolated controller to the cluster.
+    """Return an isolated controller to the cluster.
 
     :param controllers: A list of ip addresses or host names as strings.
     :param username: Username for the isolated controller.
@@ -260,20 +290,38 @@ def rejoin_controller(controllers, username, password, isolated):
     isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
-            base_str = 'sudo iptables -D OUTPUT -p all --source '
-            cmd_str = base_str + isolated_controller + ' --destination ' + controller + ' -j DROP'
+            base_str = "sudo iptables -D OUTPUT -p all --source "
+            cmd_str = (
+                base_str
+                + isolated_controller
+                + " --destination "
+                + controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-            cmd_str = base_str + controller + ' --destination ' + isolated_controller + ' -j DROP'
+            cmd_str = (
+                base_str
+                + controller
+                + " --destination "
+                + isolated_controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-    ip_tables = execute_ssh_command(isolated_controller, username, password, 'sudo iptables -L')
-    print ip_tables
-    iso_result = 'pass'
+    ip_tables = execute_ssh_command(
+        isolated_controller, username, password, "sudo iptables -L"
+    )
+    print(ip_tables)
+    iso_result = "pass"
     for controller in controllers:
-        controller_regex_string = "[\s\S]*" + isolated_controller + " *" + controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + isolated_controller + " *" + controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if controller_regex.match(ip_tables):
             iso_result = ip_tables
-        controller_regex_string = "[\s\S]*" + controller + " *" + isolated_controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + controller + " *" + isolated_controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if controller_regex.match(ip_tables):
             iso_result = ip_tables
@@ -288,63 +336,65 @@ def flush_iptables(controllers, username, password):
     :param password: Password for all controllers.
     :return: If successful, returns "pass", otherwise returns "fail".
     """
-    flush_result = 'pass'
+    flush_result = "pass"
     for controller in controllers:
-        print 'Flushing ' + controller
-        cmd_str = 'sudo iptables -v -F'
+        print("Flushing ", controller)
+        cmd_str = "sudo iptables -v -F"
         cmd_result = execute_ssh_command(controller, username, password, cmd_str)
-        print cmd_result
+        print(cmd_result)
         success_string = "Flushing chain `INPUT'" + "\n"
         success_string += "Flushing chain `FORWARD'" + "\n"
         success_string += "Flushing chain `OUTPUT'"
         if not cmd_result == success_string:
             flush_result = "Failed to flush IPTables. Check Log."
-        print "."
-        print "."
-        print "."
+        print(".")
+        print(".")
+        print(".")
     return flush_result
 
 
 def build_elastic_search_JSON_request(query_String):
-    data = {'from': '0',
-            'size': '1',
-            'sort': [{'TimeStamp': {'order': 'desc'}}],
-            'query': {'query_string': {'query': query_String}}}
+    data = {
+        "from": "0",
+        "size": "1",
+        "sort": [{"TimeStamp": {"order": "desc"}}],
+        "query": {"query_string": {"query": query_String}},
+    }
     return json.dumps(data)
 
 
 def create_query_string_search(data_category, metric_name, node_id, rk_node_id):
-    query = 'TSDRDataCategory:'
+    query = "TSDRDataCategory:"
     query += data_category
-    query += ' AND MetricName:'
+    query += " AND MetricName:"
     query += metric_name
-    query += ' AND NodeID:\"'
+    query += ' AND NodeID:"'
     query += node_id
-    query += '\" AND RecordKeys.KeyValue:\"'
+    query += '" AND RecordKeys.KeyValue:"'
     query += rk_node_id
-    query += '\" AND RecordKeys.KeyName:Node AND RecordKeys.KeyValue:0 AND RecordKeys.KeyName:Table'
+    query += '" AND RecordKeys.KeyName:Node AND RecordKeys.KeyValue:0 AND RecordKeys.KeyName:Table'
     return query
 
 
 def create_query_string_count(data_category):
-    query = 'TSDRDataCategory:'
+    query = "TSDRDataCategory:"
     query += data_category
     return query
 
 
 def extract_metric_value_search(response):
-    return str(response['hits']['hits'][0]['_source']['MetricValue'])
+    return str(response["hits"]["hits"][0]["_source"]["MetricValue"])
 
 
 def extract_metric_value_count(response):
-    return int(response['hits']['total'])
+    return int(response["hits"]["total"])
 
 
 #
 # main invoked
 if __name__ != "__main__":
-    _cache = robot.utils.ConnectionCache('No sessions created')
+    _cache = robot.utils.ConnectionCache("No sessions created")
     # here create one session for each HTTP functions
-    _cache.register(requests.session(), alias='CLUSTERING_GET')
-    _cache.register(requests.session(), alias='CLUSTERING_POST')
-    _cache.register(requests.session(), alias='CLUSTERING_DELETE')
+    _cache.register(requests.session(), alias="CLUSTERING_GET")
+    _cache.register(requests.session(), alias="CLUSTERING_POST")
+    _cache.register(requests.session(), alias="CLUSTERING_DELETE")