Get rid of deprecation warnings
[integration/test.git] / csit / libraries / UtilLibrary.py
index e244c334956332de5fd624dc749c52ffe16e8654..6f65fd2061595c2be806a6bebeacd36dce56d465 100644 (file)
@@ -17,14 +17,14 @@ __email__ = "syedbahm@cisco.com"
 global _cache
 
 
 global _cache
 
 
-def get(url, userId='admin', password='admin'):
+def get(url, userId="admin", password="admin"):
     """Helps in making GET REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
     """Helps in making GET REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
     headers = {}
     )
     headers = {}
-    headers['Accept'] = 'application/xml'
+    headers["Accept"] = "application/xml"
 
     # Send the GET request
     session = _cache.switch("CLUSTERING_GET")
 
     # Send the GET request
     session = _cache.switch("CLUSTERING_GET")
@@ -38,21 +38,23 @@ def nonprintpost(url, userId, password, data):
     """Helps in making POST REST calls without outputs"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
     """Helps in making POST REST calls without outputs"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
 
     if userId is None:
     )
 
     if userId is None:
-        userId = 'admin'
+        userId = "admin"
 
     if password is None:
 
     if password is None:
-        password = 'admin'
+        password = "admin"
 
     headers = {}
 
     headers = {}
-    headers['Content-Type'] = 'application/json'
+    headers["Content-Type"] = "application/json"
     # headers['Accept']= 'application/xml'
 
     session = _cache.switch("CLUSTERING_POST")
     # headers['Accept']= 'application/xml'
 
     session = _cache.switch("CLUSTERING_POST")
-    resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
+    resp = session.post(
+        url, data.encode("utf-8"), headers=headers, auth=(userId, password)
+    )
 
     return resp
 
 
     return resp
 
@@ -61,22 +63,24 @@ def post(url, userId, password, data):
     """Helps in making POST REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
     """Helps in making POST REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
 
     if userId is None:
     )
 
     if userId is None:
-        userId = 'admin'
+        userId = "admin"
 
     if password is None:
 
     if password is None:
-        password = 'admin'
+        password = "admin"
 
     print("post request with url " + url)
     print("post request with data " + data)
     headers = {}
 
     print("post request with url " + url)
     print("post request with data " + data)
     headers = {}
-    headers['Content-Type'] = 'application/json'
+    headers["Content-Type"] = "application/json"
     # headers['Accept'] = 'application/xml'
     session = _cache.switch("CLUSTERING_POST")
     # headers['Accept'] = 'application/xml'
     session = _cache.switch("CLUSTERING_POST")
-    resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
+    resp = session.post(
+        url, data.encode("utf-8"), headers=headers, auth=(userId, password)
+    )
 
     # print(resp.raise_for_status())
     print(resp.headers)
 
     # print(resp.raise_for_status())
     print(resp.headers)
@@ -86,11 +90,11 @@ def post(url, userId, password, data):
     return resp
 
 
     return resp
 
 
-def delete(url, userId='admin', password='admin'):
+def delete(url, userId="admin", password="admin"):
     """Helps in making DELET REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
     """Helps in making DELET REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
     print("delete all resources belonging to url" + url)
     session = _cache.switch("CLUSTERING_DELETE")
     )
     print("delete all resources belonging to url" + url)
     session = _cache.switch("CLUSTERING_DELETE")
@@ -98,12 +102,12 @@ def delete(url, userId='admin', password='admin'):
 
 
 def Should_Not_Be_Type_None(var):
 
 
 def Should_Not_Be_Type_None(var):
-    '''Keyword to check if the given variable is of type NoneType.  If the
-        variable type does match  raise an assertion so the keyword will fail
-    '''
+    """Keyword to check if the given variable is of type NoneType.  If the
+    variable type does match  raise an assertion so the keyword will fail
+    """
     if var is None:
     if var is None:
-        raise AssertionError('the variable passed was type NoneType')
-    return 'PASS'
+        raise AssertionError("the variable passed was type NoneType")
+    return "PASS"
 
 
 def execute_ssh_command(ip, username, password, command):
 
 
 def execute_ssh_command(ip, username, password, command):
@@ -112,37 +116,42 @@ def execute_ssh_command(ip, username, password, command):
     use username and password of controller server for ssh and need
     karaf distribution location like /root/Documents/dist
     """
     use username and password of controller server for ssh and need
     karaf distribution location like /root/Documents/dist
     """
-    print "executing ssh command"
+    print("executing ssh command")
     lib = SSHLibrary()
     lib.open_connection(ip)
     lib.login(username=username, password=password)
     lib = SSHLibrary()
     lib.open_connection(ip)
     lib.login(username=username, password=password)
-    print "login done"
+    print("login done")
     cmd_response = lib.execute_command(command)
     cmd_response = lib.execute_command(command)
-    print "command executed : " + command
+    print("command executed : " + command)
     lib.close_connection()
     return cmd_response
 
 
 def wait_for_controller_up(ip, port="8181"):
     lib.close_connection()
     return cmd_response
 
 
 def wait_for_controller_up(ip, port="8181"):
-    url = "http://" + ip + ":" + str(port) + \
-          "/restconf/config/opendaylight-inventory:nodes/node/controller-config/yang-ext:mount/config:modules"
+    url = (
+        "http://"
+        + ip
+        + ":"
+        + str(port)
+        + "/restconf/config/opendaylight-inventory:nodes/node/controller-config/yang-ext:mount/config:modules"
+    )
 
 
-    print "Waiting for controller " + ip + " up."
+    print("Waiting for controller " + ip + " up.")
     # Try 30*10s=5 minutes for the controller to be up.
     for i in xrange(30):
         try:
     # Try 30*10s=5 minutes for the controller to be up.
     for i in xrange(30):
         try:
-            print "attempt " + str(i) + " to url " + url
+            print("attempt %s to url %s" % (str(i), url))
             resp = get(url, "admin", "admin")
             resp = get(url, "admin", "admin")
-            print "attempt " + str(i) + " response is " + str(resp)
-            print resp.text
-            if ('clustering-it-provider' in resp.text):
-                print "Wait for controller " + ip + " succeeded"
+            print("attempt %s response is %s" % (str(i), str(resp)))
+            print(resp.text)
+            if "clustering-it-provider" in resp.text:
+                print("Wait for controller " + ip + " succeeded")
                 return True
         except Exception as e:
                 return True
         except Exception as e:
-            print e
+            print(e)
         time.sleep(10)
 
         time.sleep(10)
 
-    print "Wait for controller " + ip + " failed"
+    print("Wait for controller " + ip + " failed")
     return False
 
 
     return False
 
 
@@ -192,10 +201,9 @@ def wait_for_controller_stopped(ip, username, password, karafHome):
     i = 1
     while i <= tries:
         stdout = lib.execute_command("ps -axf | grep karaf | grep -v grep | wc -l")
     i = 1
     while i <= tries:
         stdout = lib.execute_command("ps -axf | grep karaf | grep -v grep | wc -l")
-        # print "stdout: "+stdout
-        processCnt = stdout[0].strip('\n')
+        processCnt = stdout[0].strip("\n")
         print("processCnt: " + processCnt)
         print("processCnt: " + processCnt)
-        if processCnt == '0':
+        if processCnt == "0":
             break
         i = i + 1
         time.sleep(3)
             break
         i = i + 1
         time.sleep(3)
@@ -203,21 +211,25 @@ def wait_for_controller_stopped(ip, username, password, karafHome):
     lib.close_connection()
 
     if i > tries:
     lib.close_connection()
 
     if i > tries:
-        print "Killing controller"
+        print("Killing controller")
         kill_controller(ip, username, password, karafHome)
 
 
 def clean_journal(ip, username, password, karafHome):
         kill_controller(ip, username, password, karafHome)
 
 
 def clean_journal(ip, username, password, karafHome):
-    execute_ssh_command(ip, username, password, "rm -rf " + karafHome + "/journal")
+    execute_ssh_command(ip, username, password, "rm -rf " + karafHome + "/*journal")
 
 
 def kill_controller(ip, username, password, karafHome):
 
 
 def kill_controller(ip, username, password, karafHome):
-    execute_ssh_command(ip, username, password,
-                        "ps axf | grep karaf | grep -v grep | awk '{print \"kill -9 \" $1}' | sh")
+    execute_ssh_command(
+        ip,
+        username,
+        password,
+        "ps axf | grep karaf | grep -v grep | awk '{print \"kill -9 \" $1}' | sh",
+    )
 
 
 def isolate_controller(controllers, username, password, isolated):
 
 
 def isolate_controller(controllers, username, password, isolated):
-    """ Isolate one controller from the others in the cluster
+    """Isolate one controller from the others in the cluster
 
     :param controllers: A list of ip addresses or host names as strings.
     :param username: Username for the controller to be isolated.
 
     :param controllers: A list of ip addresses or host names as strings.
     :param username: Username for the controller to be isolated.
@@ -228,20 +240,38 @@ def isolate_controller(controllers, username, password, isolated):
     isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
     isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
-            base_str = 'sudo iptables -I OUTPUT -p all --source '
-            cmd_str = base_str + isolated_controller + ' --destination ' + controller + ' -j DROP'
+            base_str = "sudo iptables -I OUTPUT -p all --source "
+            cmd_str = (
+                base_str
+                + isolated_controller
+                + " --destination "
+                + controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-            cmd_str = base_str + controller + ' --destination ' + isolated_controller + ' -j DROP'
+            cmd_str = (
+                base_str
+                + controller
+                + " --destination "
+                + isolated_controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-    ip_tables = execute_ssh_command(isolated_controller, username, password, 'sudo iptables -L')
-    print ip_tables
-    iso_result = 'pass'
+    ip_tables = execute_ssh_command(
+        isolated_controller, username, password, "sudo iptables -L"
+    )
+    print(ip_tables)
+    iso_result = "pass"
     for controller in controllers:
     for controller in controllers:
-        controller_regex_string = "[\s\S]*" + isolated_controller + " *" + controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + isolated_controller + " *" + controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if not controller_regex.match(ip_tables):
             iso_result = ip_tables
         controller_regex = re.compile(controller_regex_string)
         if not controller_regex.match(ip_tables):
             iso_result = ip_tables
-        controller_regex_string = "[\s\S]*" + controller + " *" + isolated_controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + controller + " *" + isolated_controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if not controller_regex.match(ip_tables):
             iso_result = ip_tables
         controller_regex = re.compile(controller_regex_string)
         if not controller_regex.match(ip_tables):
             iso_result = ip_tables
@@ -249,7 +279,7 @@ def isolate_controller(controllers, username, password, isolated):
 
 
 def rejoin_controller(controllers, username, password, isolated):
 
 
 def rejoin_controller(controllers, username, password, isolated):
-    """ Return an isolated controller to the cluster.
+    """Return an isolated controller to the cluster.
 
     :param controllers: A list of ip addresses or host names as strings.
     :param username: Username for the isolated controller.
 
     :param controllers: A list of ip addresses or host names as strings.
     :param username: Username for the isolated controller.
@@ -260,20 +290,38 @@ def rejoin_controller(controllers, username, password, isolated):
     isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
     isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
-            base_str = 'sudo iptables -D OUTPUT -p all --source '
-            cmd_str = base_str + isolated_controller + ' --destination ' + controller + ' -j DROP'
+            base_str = "sudo iptables -D OUTPUT -p all --source "
+            cmd_str = (
+                base_str
+                + isolated_controller
+                + " --destination "
+                + controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-            cmd_str = base_str + controller + ' --destination ' + isolated_controller + ' -j DROP'
+            cmd_str = (
+                base_str
+                + controller
+                + " --destination "
+                + isolated_controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-    ip_tables = execute_ssh_command(isolated_controller, username, password, 'sudo iptables -L')
-    print ip_tables
-    iso_result = 'pass'
+    ip_tables = execute_ssh_command(
+        isolated_controller, username, password, "sudo iptables -L"
+    )
+    print(ip_tables)
+    iso_result = "pass"
     for controller in controllers:
     for controller in controllers:
-        controller_regex_string = "[\s\S]*" + isolated_controller + " *" + controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + isolated_controller + " *" + controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if controller_regex.match(ip_tables):
             iso_result = ip_tables
         controller_regex = re.compile(controller_regex_string)
         if controller_regex.match(ip_tables):
             iso_result = ip_tables
-        controller_regex_string = "[\s\S]*" + controller + " *" + isolated_controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + controller + " *" + isolated_controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if controller_regex.match(ip_tables):
             iso_result = ip_tables
         controller_regex = re.compile(controller_regex_string)
         if controller_regex.match(ip_tables):
             iso_result = ip_tables
@@ -288,63 +336,65 @@ def flush_iptables(controllers, username, password):
     :param password: Password for all controllers.
     :return: If successful, returns "pass", otherwise returns "fail".
     """
     :param password: Password for all controllers.
     :return: If successful, returns "pass", otherwise returns "fail".
     """
-    flush_result = 'pass'
+    flush_result = "pass"
     for controller in controllers:
     for controller in controllers:
-        print 'Flushing ' + controller
-        cmd_str = 'sudo iptables -v -F'
+        print("Flushing ", controller)
+        cmd_str = "sudo iptables -v -F"
         cmd_result = execute_ssh_command(controller, username, password, cmd_str)
         cmd_result = execute_ssh_command(controller, username, password, cmd_str)
-        print cmd_result
+        print(cmd_result)
         success_string = "Flushing chain `INPUT'" + "\n"
         success_string += "Flushing chain `FORWARD'" + "\n"
         success_string += "Flushing chain `OUTPUT'"
         if not cmd_result == success_string:
             flush_result = "Failed to flush IPTables. Check Log."
         success_string = "Flushing chain `INPUT'" + "\n"
         success_string += "Flushing chain `FORWARD'" + "\n"
         success_string += "Flushing chain `OUTPUT'"
         if not cmd_result == success_string:
             flush_result = "Failed to flush IPTables. Check Log."
-        print "."
-        print "."
-        print "."
+        print(".")
+        print(".")
+        print(".")
     return flush_result
 
 
 def build_elastic_search_JSON_request(query_String):
     return flush_result
 
 
 def build_elastic_search_JSON_request(query_String):
-    data = {'from': '0',
-            'size': '1',
-            'sort': [{'TimeStamp': {'order': 'desc'}}],
-            'query': {'query_string': {'query': query_String}}}
+    data = {
+        "from": "0",
+        "size": "1",
+        "sort": [{"TimeStamp": {"order": "desc"}}],
+        "query": {"query_string": {"query": query_String}},
+    }
     return json.dumps(data)
 
 
 def create_query_string_search(data_category, metric_name, node_id, rk_node_id):
     return json.dumps(data)
 
 
 def create_query_string_search(data_category, metric_name, node_id, rk_node_id):
-    query = 'TSDRDataCategory:'
+    query = "TSDRDataCategory:"
     query += data_category
     query += data_category
-    query += ' AND MetricName:'
+    query += " AND MetricName:"
     query += metric_name
     query += metric_name
-    query += ' AND NodeID:\"'
+    query += ' AND NodeID:"'
     query += node_id
     query += node_id
-    query += '\" AND RecordKeys.KeyValue:\"'
+    query += '" AND RecordKeys.KeyValue:"'
     query += rk_node_id
     query += rk_node_id
-    query += '\" AND RecordKeys.KeyName:Node AND RecordKeys.KeyValue:0 AND RecordKeys.KeyName:Table'
+    query += '" AND RecordKeys.KeyName:Node AND RecordKeys.KeyValue:0 AND RecordKeys.KeyName:Table'
     return query
 
 
 def create_query_string_count(data_category):
     return query
 
 
 def create_query_string_count(data_category):
-    query = 'TSDRDataCategory:'
+    query = "TSDRDataCategory:"
     query += data_category
     return query
 
 
 def extract_metric_value_search(response):
     query += data_category
     return query
 
 
 def extract_metric_value_search(response):
-    return str(response['hits']['hits'][0]['_source']['MetricValue'])
+    return str(response["hits"]["hits"][0]["_source"]["MetricValue"])
 
 
 def extract_metric_value_count(response):
 
 
 def extract_metric_value_count(response):
-    return int(response['hits']['total'])
+    return int(response["hits"]["total"])
 
 
 #
 # main invoked
 if __name__ != "__main__":
 
 
 #
 # main invoked
 if __name__ != "__main__":
-    _cache = robot.utils.ConnectionCache('No sessions created')
+    _cache = robot.utils.ConnectionCache("No sessions created")
     # here create one session for each HTTP functions
     # here create one session for each HTTP functions
-    _cache.register(requests.session(), alias='CLUSTERING_GET')
-    _cache.register(requests.session(), alias='CLUSTERING_POST')
-    _cache.register(requests.session(), alias='CLUSTERING_DELETE')
+    _cache.register(requests.session(), alias="CLUSTERING_GET")
+    _cache.register(requests.session(), alias="CLUSTERING_POST")
+    _cache.register(requests.session(), alias="CLUSTERING_DELETE")