2 Appenders Object Definition to be used with karaf-decanter
3 Used to collect resource usage metrics
5 Currently implements ElasticsearchAppender
8 declare foo = ElasticsearchAppender()
9 get connection object conn = foo.get_connection(host=host,
12 foo.get_jvm_memory(conn), foo.get_jvm_classloading(conn),
13 foo.get_jvm_threadingconn(),
14 foo.get_jvm_garbagecollector(conn),
15 foo.get_jvm_operatingsystem(conn)
17 the latest resource usage statistics dictionary object
18 (latest based on the @timestamp)
20 foo.plot_points(conn, title, filename, metric,
21 submetric, submetrickey)
24 foo.plot_points(conn, 'JVM Started Threads',
25 'threadcount.png', 'Threading',
26 'TotalStartedThreadCount')
27 submetrickey is optional
28 for more usage and examples see https://goo.gl/dT1RqT
32 from datetime import datetime
33 from operator import itemgetter
34 from elasticsearch import Elasticsearch
35 from elasticsearch_dsl import Search
37 import matplotlib as mpl
42 class MBeanNotFoundError(Exception):
43 def __init__(self, message, errors):
44 super(MBeanNotFoundError, self).__init__(message)
47 class BaseAppender(object):
49 Base Appender from which all appenders should inherit
52 def _get_index(self, need_all):
53 raise NotImplementedError
55 def _get_connection(self):
56 raise NotImplementedError
59 class ElasticsearchAppender(BaseAppender):
61 ElasticsearchAppender Class
62 Metrics supported : Memory, ClassLoading, Threading, GarbageCollector
63 Individual resource attributes as defined in attr dictionary object
67 "Memory": ["HeapMemoryUsage", "NonHeapMemoryUsage", "@timestamp"],
68 "ClassLoading": ["TotalLoadedClassCount", "UnloadedClassCount", "@timestamp"],
72 "FreePhysicalMemorySize",
73 "TotalPhysicalMemorySize",
74 "CommittedVirtualMemorySize",
84 "TotalStartedThreadCount",
96 "ClassLoading": "Class Loading",
97 "Threading": "Threads",
98 "GarbageCollector": "Garbage Collector",
101 def get_connection(self, host="localhost", port=9200):
102 host = self.cleanse_string(host)
103 port = self.cleanse_string(port)
104 return self._get_connection(host, port)
106 def get_jvm_memory(self, connection):
107 return self._get_mbean_attr(connection, "Memory")
109 def get_jvm_classloading(self, connection):
110 return self._get_mbean_attr(connection, "ClassLoading")
112 def get_jvm_threading(self, connection):
113 return self._get_mbean_attr(connection, "Threading")
115 def get_jvm_garbagecollector(self, connection):
116 return self._get_mbean_attr(connection, "GarbageCollector")
118 def get_jvm_operatingsystem(self, connection):
119 return self._get_mbean_attr(connection, "OperatingSystem")
121 def cleanse_string(self, s):
122 return str(s).replace("'", "")
125 self, connection, title, filename, metric, submetric, submetrickey=None
128 from matplotlib import dates, pyplot as plt, ticker as tkr
130 metric = self.cleanse_string(metric)
131 submetric = self.cleanse_string(submetric)
132 if submetrickey is not None:
133 submetrickey = self.cleanse_string(submetrickey)
135 points = self._get_plot_points(connection, metric, submetric, submetrickey)
136 points[0] = [p.replace(microsecond=0) for p in points[0]]
137 myFmt = dates.DateFormatter("%m-%d %H:%M:%S")
138 fig, ax = plt.subplots()
140 ax.plot_date(points[0], points[1], "c-")
141 ax.grid(color="grey")
142 ax.patch.set_facecolor("black")
143 ax.xaxis.set_major_formatter(myFmt)
146 axes.get_yaxis().get_major_formatter().set_scientific(False)
147 axes.get_yaxis().get_major_formatter().set_useOffset(False)
149 ax.set_xlabel("Time")
150 xlabel = self._convert(submetric).title()
151 if submetrickey is not None:
152 xlabel = xlabel + " : " + str(submetrickey).title()
153 ax.set_ylabel(xlabel)
155 mx = max(points[1]) + max(points[1]) * 0.00001
156 mn = min(points[1]) - min(points[1]) * 0.00001
160 if isinstance(points[1][0], int):
161 axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _: int(x)))
163 axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _: float(x)))
164 plt.gcf().autofmt_xdate()
165 plt.savefig(filename, bbox_inches="tight")
167 def _convert(self, name):
168 s1 = re.sub("(.)([A-Z][a-z]+)", r"\1 \2", name)
169 return re.sub("([a-z0-9])([A-Z])", r"\1 \2", s1).lower()
171 def _get_y_val(self, response, metric, submetric=None):
172 if isinstance(response[metric], dict):
173 return response[metric][submetric]
175 return response[metric]
177 def _get_plot_points(self, connection, metric, submetric, submetrickey=None):
178 indices = self._get_index(connection, need_all=True)
180 for index in indices:
181 responses = self._get_all_mbean_attr(connection, metric, index)
182 for response in responses:
184 self._get_datetime_object(response["@timestamp"]),
185 self._get_y_val(response, submetric, submetrickey),
188 points.sort(key=itemgetter(0))
191 def _get_index(self, connection, need_all=False):
195 for i in connection.indices.get_mapping().keys()
196 if i.startswith("karaf")
202 return sorted(indices, reverse=True)[0]
204 def _get_connection(self, host, port):
205 con_obj = {"host": host, "port": port}
206 es = Elasticsearch([con_obj])
209 def _get_all_mbean_attr(self, connection, mbean, index, dsl_class="match"):
211 Search(using=connection, index=index)
212 .query(dsl_class, ObjectName=mbean)
213 .sort({"@timestamp": {"order": "desc"}})
217 response.append(self._get_attr_obj([hit], mbean))
220 def _get_mbean_attr(self, connection, mbean, dsl_class="match"):
221 index = self._get_index(connection)
225 Search(using=connection, index=index)
226 .query(dsl_class, ObjectName=mbean)
227 .sort({"@timestamp": {"order": "desc"}})[0]
231 raise MBeanNotFoundError("Could Not Fetch %s mbean" % mbean)
233 mem_attr = self._get_attr_obj(s, mbean)
236 def _get_attr_obj(self, response, mbean):
239 for k in self.attr[mbean]:
240 is_to_dict = getattr(r[k], "to_dict", None)
241 if callable(is_to_dict):
242 mbean_attr[k] = r[k].to_dict()
247 def _get_datetime_object(self, timestamp):
248 return datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S,%fZ")