Adding ElasticsearchAppenderClass & longevitytests
[integration/test.git] / csit / libraries / Appenders / ElasticsearchAppender.py
1 """
2     Appenders Object Definition to be used with karaf-decanter
3     Used to collect resource usage metrics
4
5     Currently implements ElasticsearchAppender
6
7     Usage
8             declare foo = ElasticsearchAppender(hostname, port)
9             call
10                     foo.get_jvm_memory(), foo.get_jvm_classloading(),
11                     foo.get_jvm_threading(), foo.get_jvm_garbageCollector()
12                     foo.get_jvm_operatingsystem()
13             returns
14                     the latest resource usage statistics dictionary object
15                     (latest based on the @timestamp)
16             call
17                     foo.plot_points(title, filename, metric,
18                                     submetric, submetrickey)
19
20                     for example
21                     foo.plot_points('JVM Started Threads', 'threadcount.png',
22                                     'Threading', 'TotalStartedThreadCount')
23                     submetrickey is optional
24                     for more usage and examples see https://goo.gl/dT1RqT
25
26 """
27
28 from datetime import datetime
29 from operator import itemgetter
30 from elasticsearch import Elasticsearch
31 from elasticsearch_dsl import Search
32 import re
33 import matplotlib as mpl
34 mpl.use('Agg')
35
36
37 class MBeanNotFoundError(Exception):
38         def __init__(self, message, errors):
39             super(MBeanNotFoundError, self).__init__(message)
40
41
42 class BaseAppender(object):
43     '''
44         Base Appender from which all appenders should inherit
45     '''
46
47     host = ''
48     port = ''
49
50     def __init__(self, host='localhost', port=9200):
51         self.host = host
52         self.port = port
53
54     def _get_index(self, need_all):
55         raise NotImplementedError
56
57     def _get_connection(self):
58         raise NotImplementedError
59
60
61 class ElasticsearchAppender(BaseAppender):
62     '''
63         ElasticsearchAppender Class
64         Metrics supported : Memory, ClassLoading, Threading, GarbageCollector
65         Individual resource attributes as defined in attr dictionary object
66     '''
67
68     connection = ''
69     attr = {'Memory': ['HeapMemoryUsage', 'NonHeapMemoryUsage',
70                        '@timestamp'],
71             'ClassLoading': ['TotalLoadedClassCount', 'UnloadedClassCount',
72                              '@timestamp'],
73             'OperatingSystem': ['FreeSwapSpaceSize', 'TotalSwapSpaceSize',
74                                 'FreePhysicalMemorySize',
75                                 'TotalPhysicalMemorySize',
76                                 'CommittedVirtualMemorySize', 'ProcessCpuLoad',
77                                 'ProcessCpuTime', 'SystemCpuLoad',
78                                 '@timestamp'],
79             'Threading': ['DaemonThreadCount', 'PeakThreadCount',
80                           'ThreadCount', 'TotalStartedThreadCount',
81                           '@timestamp'],
82             'GarbageCollector': ['LastGcInfo', 'CollectionCount',
83                                  '@timestamp', 'CollectionTime']}
84     label = {'Memory': 'Memory', 'ClassLoading': 'Class Loading',
85              'Threading': 'Threads', 'GarbageCollector': 'Garbage Collector'}
86
87     def __init__(self, host='localhost', port=9200):
88         host = self.cleanse_string(host)
89         port = self.cleanse_string(port)
90         super(ElasticsearchAppender, self).__init__(host, port)
91         self.connection = self._get_connection()
92
93     def get_jvm_memory(self):
94         return self._get_mbean_attr('Memory')
95
96     def get_jvm_classloading(self):
97         return self._get_mbean_attr('ClassLoading')
98
99     def get_jvm_threading(self):
100         return self._get_mbean_attr('Threading')
101
102     def get_jvm_garbagecollector(self):
103         return self._get_mbean_attr('GarbageCollector')
104
105     def get_jvm_operatingsystem(self):
106         return self._get_mbean_attr('OperatingSystem')
107
108     def cleanse_string(self, s):
109         return str(s).replace("'", "")
110
111     def plot_points(self, title, filename, metric, submetric,
112                     submetrickey=None):
113
114         from matplotlib import dates, pyplot as plt, ticker as tkr
115
116         metric = self.cleanse_string(metric)
117         submetric = self.cleanse_string(submetric)
118         if submetrickey is not None:
119             submetrickey = self.cleanse_string(submetrickey)
120
121         points = self._get_plot_points(metric, submetric, submetrickey)
122         points[0] = [p.replace(microsecond=0) for p in points[0]]
123         myFmt = dates.DateFormatter('%m-%d %H:%M:%S')
124         fig, ax = plt.subplots()
125
126         ax.plot_date(points[0], points[1], 'c-')
127         ax.grid(color='grey')
128         ax.patch.set_facecolor('black')
129         ax.xaxis.set_major_formatter(myFmt)
130
131         axes = plt.gca()
132         axes.get_yaxis().get_major_formatter().set_scientific(False)
133         axes.get_yaxis().get_major_formatter().set_useOffset(False)
134
135         ax.set_xlabel('Time')
136         xlabel = self._convert(submetric).title()
137         if submetrickey is not None:
138             xlabel = xlabel + ' : ' + str(submetrickey).title()
139         ax.set_ylabel(xlabel)
140
141         mx = max(points[1]) + max(points[1]) * 0.00001
142         mn = min(points[1]) - min(points[1]) * 0.00001
143         ax.set_ylim(mn, mx)
144
145         ax.set_title(title)
146         if isinstance(points[1][0], int):
147             axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _:
148                                                              int(x)))
149         else:
150             axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _:
151                                                              float(x)))
152         plt.gcf().autofmt_xdate()
153         plt.savefig(filename, bbox_inches='tight')
154
155     def _convert(self, name):
156         s1 = re.sub('(.)([A-Z][a-z]+)', r'\1 \2', name)
157         return re.sub('([a-z0-9])([A-Z])', r'\1 \2', s1).lower()
158
159     def _get_y_val(self, response, metric, submetric=None):
160         if isinstance(response[metric], dict):
161             return response[metric][submetric]
162         else:
163             return response[metric]
164
165     def _get_plot_points(self, metric, submetric, submetrickey=None):
166         indices = self._get_index(need_all=True)
167         points = []
168         for index in indices:
169             responses = self._get_all_mbean_attr(metric, index)
170             for response in responses:
171                 point = (self._get_datetime_object(response['@timestamp']),
172                          self._get_y_val(response, submetric, submetrickey))
173                 points.append(point)
174         points.sort(key=itemgetter(0))
175         return zip(*points)
176
177     def _get_index(self, need_all=False):
178         indices = sorted([i for i in
179                          self.connection.indices.get_mapping().keys()
180                          if i.startswith('karaf')])
181         if need_all:
182             return indices
183         else:
184             return sorted(indices, reverse=True)[0]
185
186     def _get_connection(self):
187         con_obj = {'host': self.host, 'port': self.port}
188         es = Elasticsearch([con_obj])
189         return es
190
191     def _get_all_mbean_attr(self, mbean, index, dsl_class='match'):
192         s = Search(using=self.connection, index=index).\
193             filter(dsl_class, ObjectName=mbean).\
194             sort({"@timestamp": {"order": 'desc'}})
195         response = []
196         for hit in s.scan():
197             response.append(self._get_attr_obj([hit], mbean))
198         return response
199
200     def _get_mbean_attr(self, mbean, dsl_class='match'):
201         index = self._get_index()
202
203         try:
204             s = Search(using=self.connection, index=index).\
205                 filter(dsl_class, ObjectName=mbean).\
206                 sort({"@timestamp": {"order": 'desc'}})[0].execute()
207         except Exception:
208             raise MBeanNotFoundError('Could Not Fetch %s mbean' % mbean)
209
210         mem_attr = self._get_attr_obj(s, mbean)
211         return mem_attr
212
213     def _get_attr_obj(self, response, mbean):
214         mbean_attr = {}
215         for r in response:
216             for k in self.attr[mbean]:
217                 is_to_dict = getattr(r[k], "to_dict", None)
218                 if callable(is_to_dict):
219                     mbean_attr[k] = r[k].to_dict()
220                 else:
221                     mbean_attr[k] = r[k]
222         return mbean_attr
223
224     def _get_datetime_object(self, timestamp):
225         return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S,%fZ')