Set USE_RFC8040 to True
[integration/test.git] / csit / libraries / Appenders / ElasticsearchAppender.py
index fe1d8b573e3419f85aa531ae20f4a280da92f942..60f2b46b7442437e7fdbd5a53bde3e3e68ca2b04 100644 (file)
@@ -5,21 +5,25 @@
     Currently implements ElasticsearchAppender
 
     Usage
-            declare foo = ElasticsearchAppender(hostname, port)
+            declare foo = ElasticsearchAppender()
+            get connection object conn = foo.get_connection(host=host,
+                                                            port=port)
             call
-                    foo.get_jvm_memory(), foo.get_jvm_classloading(),
-                    foo.get_jvm_threading(), foo.get_jvm_garbageCollector()
-                    foo.get_jvm_operatingsystem()
+                    foo.get_jvm_memory(conn), foo.get_jvm_classloading(conn),
+                    foo.get_jvm_threadingconn(),
+                    foo.get_jvm_garbagecollector(conn),
+                    foo.get_jvm_operatingsystem(conn)
             returns
                     the latest resource usage statistics dictionary object
                     (latest based on the @timestamp)
             call
-                    foo.plot_points(title, filename, metric,
+                    foo.plot_points(conn, title, filename, metric,
                                     submetric, submetrickey)
 
                     for example
-                    foo.plot_points('JVM Started Threads', 'threadcount.png',
-                                    'Threading', 'TotalStartedThreadCount')
+                    foo.plot_points(conn, 'JVM Started Threads',
+                                    'threadcount.png', 'Threading',
+                                    'TotalStartedThreadCount')
                     submetrickey is optional
                     for more usage and examples see https://goo.gl/dT1RqT
 
@@ -31,25 +35,19 @@ from elasticsearch import Elasticsearch
 from elasticsearch_dsl import Search
 import re
 import matplotlib as mpl
-mpl.use('Agg')
+
+mpl.use("Agg")
 
 
 class MBeanNotFoundError(Exception):
-        def __init__(self, message, errors):
-            super(MBeanNotFoundError, self).__init__(message)
+    def __init__(self, message, errors):
+        super(MBeanNotFoundError, self).__init__(message)
 
 
 class BaseAppender(object):
-    '''
-        Base Appender from which all appenders should inherit
-    '''
-
-    host = ''
-    port = ''
-
-    def __init__(self, host='localhost', port=9200):
-        self.host = host
-        self.port = port
+    """
+    Base Appender from which all appenders should inherit
+    """
 
     def _get_index(self, need_all):
         raise NotImplementedError
@@ -59,57 +57,73 @@ class BaseAppender(object):
 
 
 class ElasticsearchAppender(BaseAppender):
-    '''
-        ElasticsearchAppender Class
-        Metrics supported : Memory, ClassLoading, Threading, GarbageCollector
-        Individual resource attributes as defined in attr dictionary object
-    '''
-
-    connection = ''
-    attr = {'Memory': ['HeapMemoryUsage', 'NonHeapMemoryUsage',
-                       '@timestamp'],
-            'ClassLoading': ['TotalLoadedClassCount', 'UnloadedClassCount',
-                             '@timestamp'],
-            'OperatingSystem': ['FreeSwapSpaceSize', 'TotalSwapSpaceSize',
-                                'FreePhysicalMemorySize',
-                                'TotalPhysicalMemorySize',
-                                'CommittedVirtualMemorySize', 'ProcessCpuLoad',
-                                'ProcessCpuTime', 'SystemCpuLoad',
-                                '@timestamp'],
-            'Threading': ['DaemonThreadCount', 'PeakThreadCount',
-                          'ThreadCount', 'TotalStartedThreadCount',
-                          '@timestamp'],
-            'GarbageCollector': ['LastGcInfo', 'CollectionCount',
-                                 '@timestamp', 'CollectionTime']}
-    label = {'Memory': 'Memory', 'ClassLoading': 'Class Loading',
-             'Threading': 'Threads', 'GarbageCollector': 'Garbage Collector'}
-
-    def __init__(self, host='localhost', port=9200):
+    """
+    ElasticsearchAppender Class
+    Metrics supported : Memory, ClassLoading, Threading, GarbageCollector
+    Individual resource attributes as defined in attr dictionary object
+    """
+
+    attr = {
+        "Memory": ["HeapMemoryUsage", "NonHeapMemoryUsage", "@timestamp"],
+        "ClassLoading": ["TotalLoadedClassCount", "UnloadedClassCount", "@timestamp"],
+        "OperatingSystem": [
+            "FreeSwapSpaceSize",
+            "TotalSwapSpaceSize",
+            "FreePhysicalMemorySize",
+            "TotalPhysicalMemorySize",
+            "CommittedVirtualMemorySize",
+            "ProcessCpuLoad",
+            "ProcessCpuTime",
+            "SystemCpuLoad",
+            "@timestamp",
+        ],
+        "Threading": [
+            "DaemonThreadCount",
+            "PeakThreadCount",
+            "ThreadCount",
+            "TotalStartedThreadCount",
+            "@timestamp",
+        ],
+        "GarbageCollector": [
+            "LastGcInfo",
+            "CollectionCount",
+            "@timestamp",
+            "CollectionTime",
+        ],
+    }
+    label = {
+        "Memory": "Memory",
+        "ClassLoading": "Class Loading",
+        "Threading": "Threads",
+        "GarbageCollector": "Garbage Collector",
+    }
+
+    def get_connection(self, host="localhost", port=9200):
         host = self.cleanse_string(host)
         port = self.cleanse_string(port)
-        super(ElasticsearchAppender, self).__init__(host, port)
-        self.connection = self._get_connection()
+        return self._get_connection(host, port)
 
-    def get_jvm_memory(self):
-        return self._get_mbean_attr('Memory')
+    def get_jvm_memory(self, connection):
+        return self._get_mbean_attr(connection, "Memory")
 
-    def get_jvm_classloading(self):
-        return self._get_mbean_attr('ClassLoading')
+    def get_jvm_classloading(self, connection):
+        return self._get_mbean_attr(connection, "ClassLoading")
 
-    def get_jvm_threading(self):
-        return self._get_mbean_attr('Threading')
+    def get_jvm_threading(self, connection):
+        return self._get_mbean_attr(connection, "Threading")
 
-    def get_jvm_garbagecollector(self):
-        return self._get_mbean_attr('GarbageCollector')
+    def get_jvm_garbagecollector(self, connection):
+        return self._get_mbean_attr(connection, "GarbageCollector")
 
-    def get_jvm_operatingsystem(self):
-        return self._get_mbean_attr('OperatingSystem')
+    def get_jvm_operatingsystem(self, connection):
+        return self._get_mbean_attr(connection, "OperatingSystem")
 
     def cleanse_string(self, s):
         return str(s).replace("'", "")
 
-    def plot_points(self, title, filename, metric, submetric,
-                    submetrickey=None):
+    def plot_points(
+        self, connection, title, filename, metric, submetric, submetrickey=None
+    ):
 
         from matplotlib import dates, pyplot as plt, ticker as tkr
 
@@ -118,24 +132,24 @@ class ElasticsearchAppender(BaseAppender):
         if submetrickey is not None:
             submetrickey = self.cleanse_string(submetrickey)
 
-        points = self._get_plot_points(metric, submetric, submetrickey)
+        points = self._get_plot_points(connection, metric, submetric, submetrickey)
         points[0] = [p.replace(microsecond=0) for p in points[0]]
-        myFmt = dates.DateFormatter('%m-%d %H:%M:%S')
+        myFmt = dates.DateFormatter("%m-%d %H:%M:%S")
         fig, ax = plt.subplots()
 
-        ax.plot_date(points[0], points[1], 'c-')
-        ax.grid(color='grey')
-        ax.patch.set_facecolor('black')
+        ax.plot_date(points[0], points[1], "c-")
+        ax.grid(color="grey")
+        ax.patch.set_facecolor("black")
         ax.xaxis.set_major_formatter(myFmt)
 
         axes = plt.gca()
         axes.get_yaxis().get_major_formatter().set_scientific(False)
         axes.get_yaxis().get_major_formatter().set_useOffset(False)
 
-        ax.set_xlabel('Time')
+        ax.set_xlabel("Time")
         xlabel = self._convert(submetric).title()
         if submetrickey is not None:
-            xlabel = xlabel + ' : ' + str(submetrickey).title()
+            xlabel = xlabel + " : " + str(submetrickey).title()
         ax.set_ylabel(xlabel)
 
         mx = max(points[1]) + max(points[1]) * 0.00001
@@ -144,17 +158,15 @@ class ElasticsearchAppender(BaseAppender):
 
         ax.set_title(title)
         if isinstance(points[1][0], int):
-            axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _:
-                                                             int(x)))
+            axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _: int(x)))
         else:
-            axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _:
-                                                             float(x)))
+            axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _: float(x)))
         plt.gcf().autofmt_xdate()
-        plt.savefig(filename, bbox_inches='tight')
+        plt.savefig(filename, bbox_inches="tight")
 
     def _convert(self, name):
-        s1 = re.sub('(.)([A-Z][a-z]+)', r'\1 \2', name)
-        return re.sub('([a-z0-9])([A-Z])', r'\1 \2', s1).lower()
+        s1 = re.sub("(.)([A-Z][a-z]+)", r"\1 \2", name)
+        return re.sub("([a-z0-9])([A-Z])", r"\1 \2", s1).lower()
 
     def _get_y_val(self, response, metric, submetric=None):
         if isinstance(response[metric], dict):
@@ -162,50 +174,61 @@ class ElasticsearchAppender(BaseAppender):
         else:
             return response[metric]
 
-    def _get_plot_points(self, metric, submetric, submetrickey=None):
-        indices = self._get_index(need_all=True)
+    def _get_plot_points(self, connection, metric, submetric, submetrickey=None):
+        indices = self._get_index(connection, need_all=True)
         points = []
         for index in indices:
-            responses = self._get_all_mbean_attr(metric, index)
+            responses = self._get_all_mbean_attr(connection, metric, index)
             for response in responses:
-                point = (self._get_datetime_object(response['@timestamp']),
-                         self._get_y_val(response, submetric, submetrickey))
+                point = (
+                    self._get_datetime_object(response["@timestamp"]),
+                    self._get_y_val(response, submetric, submetrickey),
+                )
                 points.append(point)
         points.sort(key=itemgetter(0))
         return zip(*points)
 
-    def _get_index(self, need_all=False):
-        indices = sorted([i for i in
-                         self.connection.indices.get_mapping().keys()
-                         if i.startswith('karaf')])
+    def _get_index(self, connection, need_all=False):
+        indices = sorted(
+            [
+                i
+                for i in connection.indices.get_mapping().keys()
+                if i.startswith("karaf")
+            ]
+        )
         if need_all:
             return indices
         else:
             return sorted(indices, reverse=True)[0]
 
-    def _get_connection(self):
-        con_obj = {'host': self.host, 'port': self.port}
+    def _get_connection(self, host, port):
+        con_obj = {"host": host, "port": port}
         es = Elasticsearch([con_obj])
         return es
 
-    def _get_all_mbean_attr(self, mbean, index, dsl_class='match'):
-        s = Search(using=self.connection, index=index).\
-            filter(dsl_class, ObjectName=mbean).\
-            sort({"@timestamp": {"order": 'desc'}})
+    def _get_all_mbean_attr(self, connection, mbean, index, dsl_class="match"):
+        s = (
+            Search(using=connection, index=index)
+            .query(dsl_class, ObjectName=mbean)
+            .sort({"@timestamp": {"order": "desc"}})
+        )
         response = []
         for hit in s.scan():
             response.append(self._get_attr_obj([hit], mbean))
         return response
 
-    def _get_mbean_attr(self, mbean, dsl_class='match'):
-        index = self._get_index()
+    def _get_mbean_attr(self, connection, mbean, dsl_class="match"):
+        index = self._get_index(connection)
 
         try:
-            s = Search(using=self.connection, index=index).\
-                filter(dsl_class, ObjectName=mbean).\
-                sort({"@timestamp": {"order": 'desc'}})[0].execute()
+            s = (
+                Search(using=connection, index=index)
+                .query(dsl_class, ObjectName=mbean)
+                .sort({"@timestamp": {"order": "desc"}})[0]
+                .execute()
+            )
         except Exception:
-            raise MBeanNotFoundError('Could Not Fetch %s mbean' % mbean)
+            raise MBeanNotFoundError("Could Not Fetch %s mbean" % mbean)
 
         mem_attr = self._get_attr_obj(s, mbean)
         return mem_attr
@@ -222,4 +245,4 @@ class ElasticsearchAppender(BaseAppender):
         return mbean_attr
 
     def _get_datetime_object(self, timestamp):
-        return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S,%fZ')
+        return datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S,%fZ")