Fix docs to use real function name
[integration/test.git] / csit / libraries / Appenders / ElasticsearchAppender.py
1 """
2     Appenders Object Definition to be used with karaf-decanter
3     Used to collect resource usage metrics
4
5     Currently implements ElasticsearchAppender
6
7     Usage
8             declare foo = ElasticsearchAppender()
9             get connection object conn = foo.get_connection(host=host,
10                                                             port=port)
11             call
12                     foo.get_jvm_memory(conn), foo.get_jvm_classloading(conn),
13                     foo.get_jvm_threadingconn(),
14                     foo.get_jvm_garbagecollector(conn),
15                     foo.get_jvm_operatingsystem(conn)
16             returns
17                     the latest resource usage statistics dictionary object
18                     (latest based on the @timestamp)
19             call
20                     foo.plot_points(conn, title, filename, metric,
21                                     submetric, submetrickey)
22
23                     for example
24                     foo.plot_points(conn, 'JVM Started Threads',
25                                     'threadcount.png', 'Threading',
26                                     'TotalStartedThreadCount')
27                     submetrickey is optional
28                     for more usage and examples see https://goo.gl/dT1RqT
29
30 """
31
32 from datetime import datetime
33 from operator import itemgetter
34 from elasticsearch import Elasticsearch
35 from elasticsearch_dsl import Search
36 import re
37 import matplotlib as mpl
38 mpl.use('Agg')
39
40
41 class MBeanNotFoundError(Exception):
42     def __init__(self, message, errors):
43         super(MBeanNotFoundError, self).__init__(message)
44
45
46 class BaseAppender(object):
47     '''
48         Base Appender from which all appenders should inherit
49     '''
50
51     def _get_index(self, need_all):
52         raise NotImplementedError
53
54     def _get_connection(self):
55         raise NotImplementedError
56
57
58 class ElasticsearchAppender(BaseAppender):
59     '''
60         ElasticsearchAppender Class
61         Metrics supported : Memory, ClassLoading, Threading, GarbageCollector
62         Individual resource attributes as defined in attr dictionary object
63     '''
64
65     attr = {'Memory': ['HeapMemoryUsage', 'NonHeapMemoryUsage',
66                        '@timestamp'],
67             'ClassLoading': ['TotalLoadedClassCount', 'UnloadedClassCount',
68                              '@timestamp'],
69             'OperatingSystem': ['FreeSwapSpaceSize', 'TotalSwapSpaceSize',
70                                 'FreePhysicalMemorySize',
71                                 'TotalPhysicalMemorySize',
72                                 'CommittedVirtualMemorySize', 'ProcessCpuLoad',
73                                 'ProcessCpuTime', 'SystemCpuLoad',
74                                 '@timestamp'],
75             'Threading': ['DaemonThreadCount', 'PeakThreadCount',
76                           'ThreadCount', 'TotalStartedThreadCount',
77                           '@timestamp'],
78             'GarbageCollector': ['LastGcInfo', 'CollectionCount',
79                                  '@timestamp', 'CollectionTime']}
80     label = {'Memory': 'Memory', 'ClassLoading': 'Class Loading',
81              'Threading': 'Threads', 'GarbageCollector': 'Garbage Collector'}
82
83     def get_connection(self, host='localhost', port=9200):
84         host = self.cleanse_string(host)
85         port = self.cleanse_string(port)
86         return self._get_connection(host, port)
87
88     def get_jvm_memory(self, connection):
89         return self._get_mbean_attr(connection, 'Memory')
90
91     def get_jvm_classloading(self, connection):
92         return self._get_mbean_attr(connection, 'ClassLoading',)
93
94     def get_jvm_threading(self, connection):
95         return self._get_mbean_attr(connection, 'Threading')
96
97     def get_jvm_garbagecollector(self, connection):
98         return self._get_mbean_attr(connection, 'GarbageCollector')
99
100     def get_jvm_operatingsystem(self, connection):
101         return self._get_mbean_attr(connection, 'OperatingSystem')
102
103     def cleanse_string(self, s):
104         return str(s).replace("'", "")
105
106     def plot_points(self, connection, title, filename, metric, submetric,
107                     submetrickey=None):
108
109         from matplotlib import dates, pyplot as plt, ticker as tkr
110
111         metric = self.cleanse_string(metric)
112         submetric = self.cleanse_string(submetric)
113         if submetrickey is not None:
114             submetrickey = self.cleanse_string(submetrickey)
115
116         points = self._get_plot_points(connection, metric, submetric,
117                                        submetrickey)
118         points[0] = [p.replace(microsecond=0) for p in points[0]]
119         myFmt = dates.DateFormatter('%m-%d %H:%M:%S')
120         fig, ax = plt.subplots()
121
122         ax.plot_date(points[0], points[1], 'c-')
123         ax.grid(color='grey')
124         ax.patch.set_facecolor('black')
125         ax.xaxis.set_major_formatter(myFmt)
126
127         axes = plt.gca()
128         axes.get_yaxis().get_major_formatter().set_scientific(False)
129         axes.get_yaxis().get_major_formatter().set_useOffset(False)
130
131         ax.set_xlabel('Time')
132         xlabel = self._convert(submetric).title()
133         if submetrickey is not None:
134             xlabel = xlabel + ' : ' + str(submetrickey).title()
135         ax.set_ylabel(xlabel)
136
137         mx = max(points[1]) + max(points[1]) * 0.00001
138         mn = min(points[1]) - min(points[1]) * 0.00001
139         ax.set_ylim(mn, mx)
140
141         ax.set_title(title)
142         if isinstance(points[1][0], int):
143             axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _:
144                                                              int(x)))
145         else:
146             axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _:
147                                                              float(x)))
148         plt.gcf().autofmt_xdate()
149         plt.savefig(filename, bbox_inches='tight')
150
151     def _convert(self, name):
152         s1 = re.sub('(.)([A-Z][a-z]+)', r'\1 \2', name)
153         return re.sub('([a-z0-9])([A-Z])', r'\1 \2', s1).lower()
154
155     def _get_y_val(self, response, metric, submetric=None):
156         if isinstance(response[metric], dict):
157             return response[metric][submetric]
158         else:
159             return response[metric]
160
161     def _get_plot_points(self, connection, metric, submetric,
162                          submetrickey=None):
163         indices = self._get_index(connection, need_all=True)
164         points = []
165         for index in indices:
166             responses = self._get_all_mbean_attr(connection, metric, index)
167             for response in responses:
168                 point = (self._get_datetime_object(response['@timestamp']),
169                          self._get_y_val(response, submetric, submetrickey))
170                 points.append(point)
171         points.sort(key=itemgetter(0))
172         return zip(*points)
173
174     def _get_index(self, connection, need_all=False):
175         indices = sorted([i for i in
176                           connection.indices.get_mapping().keys()
177                           if i.startswith('karaf')])
178         if need_all:
179             return indices
180         else:
181             return sorted(indices, reverse=True)[0]
182
183     def _get_connection(self, host, port):
184         con_obj = {'host': host, 'port': port}
185         es = Elasticsearch([con_obj])
186         return es
187
188     def _get_all_mbean_attr(self, connection, mbean, index, dsl_class='match'):
189         s = Search(using=connection, index=index).\
190             query(dsl_class, ObjectName=mbean).\
191             sort({"@timestamp": {"order": 'desc'}})
192         response = []
193         for hit in s.scan():
194             response.append(self._get_attr_obj([hit], mbean))
195         return response
196
197     def _get_mbean_attr(self, connection, mbean, dsl_class='match'):
198         index = self._get_index(connection)
199
200         try:
201             s = Search(using=connection, index=index).\
202                 query(dsl_class, ObjectName=mbean).\
203                 sort({"@timestamp": {"order": 'desc'}})[0].execute()
204         except Exception:
205             raise MBeanNotFoundError('Could Not Fetch %s mbean' % mbean)
206
207         mem_attr = self._get_attr_obj(s, mbean)
208         return mem_attr
209
210     def _get_attr_obj(self, response, mbean):
211         mbean_attr = {}
212         for r in response:
213             for k in self.attr[mbean]:
214                 is_to_dict = getattr(r[k], "to_dict", None)
215                 if callable(is_to_dict):
216                     mbean_attr[k] = r[k].to_dict()
217                 else:
218                     mbean_attr[k] = r[k]
219         return mbean_attr
220
221     def _get_datetime_object(self, timestamp):
222         return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S,%fZ')