Bump pre-commit black to 22.1.0
[integration/test.git] / csit / libraries / UtilLibrary.py
index c342386f0aa85407410cc7d12333d15156928a58..6f65fd2061595c2be806a6bebeacd36dce56d465 100644 (file)
@@ -17,14 +17,14 @@ __email__ = "syedbahm@cisco.com"
 global _cache
 
 
 global _cache
 
 
-def get(url, userId='admin', password='admin'):
+def get(url, userId="admin", password="admin"):
     """Helps in making GET REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
     """Helps in making GET REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
     headers = {}
     )
     headers = {}
-    headers['Accept'] = 'application/xml'
+    headers["Accept"] = "application/xml"
 
     # Send the GET request
     session = _cache.switch("CLUSTERING_GET")
 
     # Send the GET request
     session = _cache.switch("CLUSTERING_GET")
@@ -38,21 +38,23 @@ def nonprintpost(url, userId, password, data):
     """Helps in making POST REST calls without outputs"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
     """Helps in making POST REST calls without outputs"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
 
     if userId is None:
     )
 
     if userId is None:
-        userId = 'admin'
+        userId = "admin"
 
     if password is None:
 
     if password is None:
-        password = 'admin'
+        password = "admin"
 
     headers = {}
 
     headers = {}
-    headers['Content-Type'] = 'application/json'
+    headers["Content-Type"] = "application/json"
     # headers['Accept']= 'application/xml'
 
     session = _cache.switch("CLUSTERING_POST")
     # headers['Accept']= 'application/xml'
 
     session = _cache.switch("CLUSTERING_POST")
-    resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
+    resp = session.post(
+        url, data.encode("utf-8"), headers=headers, auth=(userId, password)
+    )
 
     return resp
 
 
     return resp
 
@@ -61,22 +63,24 @@ def post(url, userId, password, data):
     """Helps in making POST REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
     """Helps in making POST REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
 
     if userId is None:
     )
 
     if userId is None:
-        userId = 'admin'
+        userId = "admin"
 
     if password is None:
 
     if password is None:
-        password = 'admin'
+        password = "admin"
 
     print("post request with url " + url)
     print("post request with data " + data)
     headers = {}
 
     print("post request with url " + url)
     print("post request with data " + data)
     headers = {}
-    headers['Content-Type'] = 'application/json'
+    headers["Content-Type"] = "application/json"
     # headers['Accept'] = 'application/xml'
     session = _cache.switch("CLUSTERING_POST")
     # headers['Accept'] = 'application/xml'
     session = _cache.switch("CLUSTERING_POST")
-    resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
+    resp = session.post(
+        url, data.encode("utf-8"), headers=headers, auth=(userId, password)
+    )
 
     # print(resp.raise_for_status())
     print(resp.headers)
 
     # print(resp.raise_for_status())
     print(resp.headers)
@@ -86,11 +90,11 @@ def post(url, userId, password, data):
     return resp
 
 
     return resp
 
 
-def delete(url, userId='admin', password='admin'):
+def delete(url, userId="admin", password="admin"):
     """Helps in making DELET REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
     """Helps in making DELET REST calls"""
     warnings.warn(
         "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
-        DeprecationWarning
+        DeprecationWarning,
     )
     print("delete all resources belonging to url" + url)
     session = _cache.switch("CLUSTERING_DELETE")
     )
     print("delete all resources belonging to url" + url)
     session = _cache.switch("CLUSTERING_DELETE")
@@ -98,12 +102,12 @@ def delete(url, userId='admin', password='admin'):
 
 
 def Should_Not_Be_Type_None(var):
 
 
 def Should_Not_Be_Type_None(var):
-    '''Keyword to check if the given variable is of type NoneType.  If the
-        variable type does match  raise an assertion so the keyword will fail
-    '''
+    """Keyword to check if the given variable is of type NoneType.  If the
+    variable type does match  raise an assertion so the keyword will fail
+    """
     if var is None:
     if var is None:
-        raise AssertionError('the variable passed was type NoneType')
-    return 'PASS'
+        raise AssertionError("the variable passed was type NoneType")
+    return "PASS"
 
 
 def execute_ssh_command(ip, username, password, command):
 
 
 def execute_ssh_command(ip, username, password, command):
@@ -124,8 +128,13 @@ def execute_ssh_command(ip, username, password, command):
 
 
 def wait_for_controller_up(ip, port="8181"):
 
 
 def wait_for_controller_up(ip, port="8181"):
-    url = "http://" + ip + ":" + str(port) + \
-          "/restconf/config/opendaylight-inventory:nodes/node/controller-config/yang-ext:mount/config:modules"
+    url = (
+        "http://"
+        + ip
+        + ":"
+        + str(port)
+        + "/restconf/config/opendaylight-inventory:nodes/node/controller-config/yang-ext:mount/config:modules"
+    )
 
     print("Waiting for controller " + ip + " up.")
     # Try 30*10s=5 minutes for the controller to be up.
 
     print("Waiting for controller " + ip + " up.")
     # Try 30*10s=5 minutes for the controller to be up.
@@ -135,7 +144,7 @@ def wait_for_controller_up(ip, port="8181"):
             resp = get(url, "admin", "admin")
             print("attempt %s response is %s" % (str(i), str(resp)))
             print(resp.text)
             resp = get(url, "admin", "admin")
             print("attempt %s response is %s" % (str(i), str(resp)))
             print(resp.text)
-            if ('clustering-it-provider' in resp.text):
+            if "clustering-it-provider" in resp.text:
                 print("Wait for controller " + ip + " succeeded")
                 return True
         except Exception as e:
                 print("Wait for controller " + ip + " succeeded")
                 return True
         except Exception as e:
@@ -192,9 +201,9 @@ def wait_for_controller_stopped(ip, username, password, karafHome):
     i = 1
     while i <= tries:
         stdout = lib.execute_command("ps -axf | grep karaf | grep -v grep | wc -l")
     i = 1
     while i <= tries:
         stdout = lib.execute_command("ps -axf | grep karaf | grep -v grep | wc -l")
-        processCnt = stdout[0].strip('\n')
+        processCnt = stdout[0].strip("\n")
         print("processCnt: " + processCnt)
         print("processCnt: " + processCnt)
-        if processCnt == '0':
+        if processCnt == "0":
             break
         i = i + 1
         time.sleep(3)
             break
         i = i + 1
         time.sleep(3)
@@ -211,12 +220,16 @@ def clean_journal(ip, username, password, karafHome):
 
 
 def kill_controller(ip, username, password, karafHome):
 
 
 def kill_controller(ip, username, password, karafHome):
-    execute_ssh_command(ip, username, password,
-                        "ps axf | grep karaf | grep -v grep | awk '{print \"kill -9 \" $1}' | sh")
+    execute_ssh_command(
+        ip,
+        username,
+        password,
+        "ps axf | grep karaf | grep -v grep | awk '{print \"kill -9 \" $1}' | sh",
+    )
 
 
 def isolate_controller(controllers, username, password, isolated):
 
 
 def isolate_controller(controllers, username, password, isolated):
-    """ Isolate one controller from the others in the cluster
+    """Isolate one controller from the others in the cluster
 
     :param controllers: A list of ip addresses or host names as strings.
     :param username: Username for the controller to be isolated.
 
     :param controllers: A list of ip addresses or host names as strings.
     :param username: Username for the controller to be isolated.
@@ -227,20 +240,38 @@ def isolate_controller(controllers, username, password, isolated):
     isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
     isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
-            base_str = 'sudo iptables -I OUTPUT -p all --source '
-            cmd_str = base_str + isolated_controller + ' --destination ' + controller + ' -j DROP'
+            base_str = "sudo iptables -I OUTPUT -p all --source "
+            cmd_str = (
+                base_str
+                + isolated_controller
+                + " --destination "
+                + controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-            cmd_str = base_str + controller + ' --destination ' + isolated_controller + ' -j DROP'
+            cmd_str = (
+                base_str
+                + controller
+                + " --destination "
+                + isolated_controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-    ip_tables = execute_ssh_command(isolated_controller, username, password, 'sudo iptables -L')
+    ip_tables = execute_ssh_command(
+        isolated_controller, username, password, "sudo iptables -L"
+    )
     print(ip_tables)
     print(ip_tables)
-    iso_result = 'pass'
+    iso_result = "pass"
     for controller in controllers:
     for controller in controllers:
-        controller_regex_string = "[\s\S]*" + isolated_controller + " *" + controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + isolated_controller + " *" + controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if not controller_regex.match(ip_tables):
             iso_result = ip_tables
         controller_regex = re.compile(controller_regex_string)
         if not controller_regex.match(ip_tables):
             iso_result = ip_tables
-        controller_regex_string = "[\s\S]*" + controller + " *" + isolated_controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + controller + " *" + isolated_controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if not controller_regex.match(ip_tables):
             iso_result = ip_tables
         controller_regex = re.compile(controller_regex_string)
         if not controller_regex.match(ip_tables):
             iso_result = ip_tables
@@ -248,7 +279,7 @@ def isolate_controller(controllers, username, password, isolated):
 
 
 def rejoin_controller(controllers, username, password, isolated):
 
 
 def rejoin_controller(controllers, username, password, isolated):
-    """ Return an isolated controller to the cluster.
+    """Return an isolated controller to the cluster.
 
     :param controllers: A list of ip addresses or host names as strings.
     :param username: Username for the isolated controller.
 
     :param controllers: A list of ip addresses or host names as strings.
     :param username: Username for the isolated controller.
@@ -259,20 +290,38 @@ def rejoin_controller(controllers, username, password, isolated):
     isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
     isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
-            base_str = 'sudo iptables -D OUTPUT -p all --source '
-            cmd_str = base_str + isolated_controller + ' --destination ' + controller + ' -j DROP'
+            base_str = "sudo iptables -D OUTPUT -p all --source "
+            cmd_str = (
+                base_str
+                + isolated_controller
+                + " --destination "
+                + controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-            cmd_str = base_str + controller + ' --destination ' + isolated_controller + ' -j DROP'
+            cmd_str = (
+                base_str
+                + controller
+                + " --destination "
+                + isolated_controller
+                + " -j DROP"
+            )
             execute_ssh_command(isolated_controller, username, password, cmd_str)
             execute_ssh_command(isolated_controller, username, password, cmd_str)
-    ip_tables = execute_ssh_command(isolated_controller, username, password, 'sudo iptables -L')
+    ip_tables = execute_ssh_command(
+        isolated_controller, username, password, "sudo iptables -L"
+    )
     print(ip_tables)
     print(ip_tables)
-    iso_result = 'pass'
+    iso_result = "pass"
     for controller in controllers:
     for controller in controllers:
-        controller_regex_string = "[\s\S]*" + isolated_controller + " *" + controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + isolated_controller + " *" + controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if controller_regex.match(ip_tables):
             iso_result = ip_tables
         controller_regex = re.compile(controller_regex_string)
         if controller_regex.match(ip_tables):
             iso_result = ip_tables
-        controller_regex_string = "[\s\S]*" + controller + " *" + isolated_controller + "[\s\S]*"
+        controller_regex_string = (
+            r"[\s\S]*" + controller + " *" + isolated_controller + r"[\s\S]*"
+        )
         controller_regex = re.compile(controller_regex_string)
         if controller_regex.match(ip_tables):
             iso_result = ip_tables
         controller_regex = re.compile(controller_regex_string)
         if controller_regex.match(ip_tables):
             iso_result = ip_tables
@@ -287,10 +336,10 @@ def flush_iptables(controllers, username, password):
     :param password: Password for all controllers.
     :return: If successful, returns "pass", otherwise returns "fail".
     """
     :param password: Password for all controllers.
     :return: If successful, returns "pass", otherwise returns "fail".
     """
-    flush_result = 'pass'
+    flush_result = "pass"
     for controller in controllers:
     for controller in controllers:
-        print('Flushing ', controller)
-        cmd_str = 'sudo iptables -v -F'
+        print("Flushing ", controller)
+        cmd_str = "sudo iptables -v -F"
         cmd_result = execute_ssh_command(controller, username, password, cmd_str)
         print(cmd_result)
         success_string = "Flushing chain `INPUT'" + "\n"
         cmd_result = execute_ssh_command(controller, username, password, cmd_str)
         print(cmd_result)
         success_string = "Flushing chain `INPUT'" + "\n"
@@ -305,45 +354,47 @@ def flush_iptables(controllers, username, password):
 
 
 def build_elastic_search_JSON_request(query_String):
 
 
 def build_elastic_search_JSON_request(query_String):
-    data = {'from': '0',
-            'size': '1',
-            'sort': [{'TimeStamp': {'order': 'desc'}}],
-            'query': {'query_string': {'query': query_String}}}
+    data = {
+        "from": "0",
+        "size": "1",
+        "sort": [{"TimeStamp": {"order": "desc"}}],
+        "query": {"query_string": {"query": query_String}},
+    }
     return json.dumps(data)
 
 
 def create_query_string_search(data_category, metric_name, node_id, rk_node_id):
     return json.dumps(data)
 
 
 def create_query_string_search(data_category, metric_name, node_id, rk_node_id):
-    query = 'TSDRDataCategory:'
+    query = "TSDRDataCategory:"
     query += data_category
     query += data_category
-    query += ' AND MetricName:'
+    query += " AND MetricName:"
     query += metric_name
     query += metric_name
-    query += ' AND NodeID:\"'
+    query += ' AND NodeID:"'
     query += node_id
     query += node_id
-    query += '\" AND RecordKeys.KeyValue:\"'
+    query += '" AND RecordKeys.KeyValue:"'
     query += rk_node_id
     query += rk_node_id
-    query += '\" AND RecordKeys.KeyName:Node AND RecordKeys.KeyValue:0 AND RecordKeys.KeyName:Table'
+    query += '" AND RecordKeys.KeyName:Node AND RecordKeys.KeyValue:0 AND RecordKeys.KeyName:Table'
     return query
 
 
 def create_query_string_count(data_category):
     return query
 
 
 def create_query_string_count(data_category):
-    query = 'TSDRDataCategory:'
+    query = "TSDRDataCategory:"
     query += data_category
     return query
 
 
 def extract_metric_value_search(response):
     query += data_category
     return query
 
 
 def extract_metric_value_search(response):
-    return str(response['hits']['hits'][0]['_source']['MetricValue'])
+    return str(response["hits"]["hits"][0]["_source"]["MetricValue"])
 
 
 def extract_metric_value_count(response):
 
 
 def extract_metric_value_count(response):
-    return int(response['hits']['total'])
+    return int(response["hits"]["total"])
 
 
 #
 # main invoked
 if __name__ != "__main__":
 
 
 #
 # main invoked
 if __name__ != "__main__":
-    _cache = robot.utils.ConnectionCache('No sessions created')
+    _cache = robot.utils.ConnectionCache("No sessions created")
     # here create one session for each HTTP functions
     # here create one session for each HTTP functions
-    _cache.register(requests.session(), alias='CLUSTERING_GET')
-    _cache.register(requests.session(), alias='CLUSTERING_POST')
-    _cache.register(requests.session(), alias='CLUSTERING_DELETE')
+    _cache.register(requests.session(), alias="CLUSTERING_GET")
+    _cache.register(requests.session(), alias="CLUSTERING_POST")
+    _cache.register(requests.session(), alias="CLUSTERING_DELETE")