Append 3 more punt path protection TCs
[integration/test.git] / csit / libraries / UtilLibrary.py
index 05ef69ab374ada992473714c97b64244f737328d..e244c334956332de5fd624dc749c52ffe16e8654 100644 (file)
@@ -1,22 +1,28 @@
-__author__ = "Basheeruddin Ahmed"
-__copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
-__license__ = "New-style BSD"
-__email__ = "syedbahm@cisco.com"
-
-
 import requests
 from SSHLibrary import SSHLibrary
 
 import robot
 import time
 import re
+import json
+import warnings
+
+
+__author__ = "Basheeruddin Ahmed"
+__copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
+__license__ = "New-style BSD"
+__email__ = "syedbahm@cisco.com"
+
 
 global _cache
 
 
 def get(url, userId='admin', password='admin'):
     """Helps in making GET REST calls"""
-
+    warnings.warn(
+        "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
+        DeprecationWarning
+    )
     headers = {}
     headers['Accept'] = 'application/xml'
 
@@ -30,6 +36,10 @@ def get(url, userId='admin', password='admin'):
 
 def nonprintpost(url, userId, password, data):
     """Helps in making POST REST calls without outputs"""
+    warnings.warn(
+        "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
+        DeprecationWarning
+    )
 
     if userId is None:
         userId = 'admin'
@@ -49,6 +59,10 @@ def nonprintpost(url, userId, password, data):
 
 def post(url, userId, password, data):
     """Helps in making POST REST calls"""
+    warnings.warn(
+        "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
+        DeprecationWarning
+    )
 
     if userId is None:
         userId = 'admin'
@@ -56,16 +70,16 @@ def post(url, userId, password, data):
     if password is None:
         password = 'admin'
 
-    print("post request with url "+url)
-    print("post request with data "+data)
+    print("post request with url " + url)
+    print("post request with data " + data)
     headers = {}
     headers['Content-Type'] = 'application/json'
-    # headers['Accept']= 'application/xml'
+    # headers['Accept'] = 'application/xml'
     session = _cache.switch("CLUSTERING_POST")
     resp = session.post(url, data.encode('utf-8'), headers=headers, auth=(userId, password))
 
-    # print (resp.raise_for_status())
-    print (resp.headers)
+    # print(resp.raise_for_status())
+    print(resp.headers)
     if resp.status_code >= 500:
         print(resp.text)
 
@@ -74,7 +88,11 @@ def post(url, userId, password, data):
 
 def delete(url, userId='admin', password='admin'):
     """Helps in making DELET REST calls"""
-    print("delete all resources belonging to url"+url)
+    warnings.warn(
+        "Use the Robot RequestsLibrary rather than this. See DatastoreCRUD.robot for examples",
+        DeprecationWarning
+    )
+    print("delete all resources belonging to url" + url)
     session = _cache.switch("CLUSTERING_DELETE")
     resp = session.delete(url, auth=(userId, password))  # noqa
 
@@ -131,7 +149,7 @@ def wait_for_controller_up(ip, port="8181"):
 def startAllControllers(username, password, karafhome, port, *ips):
     # Start all controllers
     for ip in ips:
-        execute_ssh_command(ip, username, password, karafhome+"/bin/start")
+        execute_ssh_command(ip, username, password, karafhome + "/bin/start")
 
     # Wait for all of them to be up
     for ip in ips:
@@ -153,7 +171,7 @@ def stopcontroller(ip, username, password, karafhome):
 
 
 def executeStopController(ip, username, password, karafhome):
-    execute_ssh_command(ip, username, password, karafhome+"/bin/stop")
+    execute_ssh_command(ip, username, password, karafhome + "/bin/stop")
 
 
 def stopAllControllers(username, password, karafhome, *ips):
@@ -207,7 +225,7 @@ def isolate_controller(controllers, username, password, isolated):
     :param isolated: Number (starting at one) of the controller to be isolated.
     :return: If successful, returns "pass", otherwise returns the last failed IPTables text.
     """
-    isolated_controller = controllers[isolated-1]
+    isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
             base_str = 'sudo iptables -I OUTPUT -p all --source '
@@ -239,7 +257,7 @@ def rejoin_controller(controllers, username, password, isolated):
     :param isolated: Number (starting at one) of the isolated controller isolated.
     :return: If successful, returns "pass", otherwise returns the last failed IPTables text.
     """
-    isolated_controller = controllers[isolated-1]
+    isolated_controller = controllers[isolated - 1]
     for controller in controllers:
         if controller != isolated_controller:
             base_str = 'sudo iptables -D OUTPUT -p all --source '
@@ -287,6 +305,41 @@ def flush_iptables(controllers, username, password):
     return flush_result
 
 
+def build_elastic_search_JSON_request(query_String):
+    data = {'from': '0',
+            'size': '1',
+            'sort': [{'TimeStamp': {'order': 'desc'}}],
+            'query': {'query_string': {'query': query_String}}}
+    return json.dumps(data)
+
+
+def create_query_string_search(data_category, metric_name, node_id, rk_node_id):
+    query = 'TSDRDataCategory:'
+    query += data_category
+    query += ' AND MetricName:'
+    query += metric_name
+    query += ' AND NodeID:\"'
+    query += node_id
+    query += '\" AND RecordKeys.KeyValue:\"'
+    query += rk_node_id
+    query += '\" AND RecordKeys.KeyName:Node AND RecordKeys.KeyValue:0 AND RecordKeys.KeyName:Table'
+    return query
+
+
+def create_query_string_count(data_category):
+    query = 'TSDRDataCategory:'
+    query += data_category
+    return query
+
+
+def extract_metric_value_search(response):
+    return str(response['hits']['hits'][0]['_source']['MetricValue'])
+
+
+def extract_metric_value_count(response):
+    return int(response['hits']['total'])
+
+
 #
 # main invoked
 if __name__ != "__main__":