Migrate request invocations (openflowplugin)
[integration/test.git] / csit / libraries / Appenders / ElasticsearchAppender.py
1 """
2     Appenders Object Definition to be used with karaf-decanter
3     Used to collect resource usage metrics
4
5     Currently implements ElasticsearchAppender
6
7     Usage
8             declare foo = ElasticsearchAppender()
9             get connection object conn = foo.get_connection(host=host,
10                                                             port=port)
11             call
12                     foo.get_jvm_memory(conn), foo.get_jvm_classloading(conn),
13                     foo.get_jvm_threadingconn(),
14                     foo.get_jvm_garbagecollector(conn),
15                     foo.get_jvm_operatingsystem(conn)
16             returns
17                     the latest resource usage statistics dictionary object
18                     (latest based on the @timestamp)
19             call
20                     foo.plot_points(conn, title, filename, metric,
21                                     submetric, submetrickey)
22
23                     for example
24                     foo.plot_points(conn, 'JVM Started Threads',
25                                     'threadcount.png', 'Threading',
26                                     'TotalStartedThreadCount')
27                     submetrickey is optional
28                     for more usage and examples see https://goo.gl/dT1RqT
29
30 """
31
32 from datetime import datetime
33 from operator import itemgetter
34 from elasticsearch import Elasticsearch
35 from elasticsearch_dsl import Search
36 import re
37 import matplotlib as mpl
38
39 mpl.use("Agg")
40
41
42 class MBeanNotFoundError(Exception):
43     def __init__(self, message, errors):
44         super(MBeanNotFoundError, self).__init__(message)
45
46
47 class BaseAppender(object):
48     """
49     Base Appender from which all appenders should inherit
50     """
51
52     def _get_index(self, need_all):
53         raise NotImplementedError
54
55     def _get_connection(self):
56         raise NotImplementedError
57
58
59 class ElasticsearchAppender(BaseAppender):
60     """
61     ElasticsearchAppender Class
62     Metrics supported : Memory, ClassLoading, Threading, GarbageCollector
63     Individual resource attributes as defined in attr dictionary object
64     """
65
66     attr = {
67         "Memory": ["HeapMemoryUsage", "NonHeapMemoryUsage", "@timestamp"],
68         "ClassLoading": ["TotalLoadedClassCount", "UnloadedClassCount", "@timestamp"],
69         "OperatingSystem": [
70             "FreeSwapSpaceSize",
71             "TotalSwapSpaceSize",
72             "FreePhysicalMemorySize",
73             "TotalPhysicalMemorySize",
74             "CommittedVirtualMemorySize",
75             "ProcessCpuLoad",
76             "ProcessCpuTime",
77             "SystemCpuLoad",
78             "@timestamp",
79         ],
80         "Threading": [
81             "DaemonThreadCount",
82             "PeakThreadCount",
83             "ThreadCount",
84             "TotalStartedThreadCount",
85             "@timestamp",
86         ],
87         "GarbageCollector": [
88             "LastGcInfo",
89             "CollectionCount",
90             "@timestamp",
91             "CollectionTime",
92         ],
93     }
94     label = {
95         "Memory": "Memory",
96         "ClassLoading": "Class Loading",
97         "Threading": "Threads",
98         "GarbageCollector": "Garbage Collector",
99     }
100
101     def get_connection(self, host="localhost", port=9200):
102         host = self.cleanse_string(host)
103         port = self.cleanse_string(port)
104         return self._get_connection(host, port)
105
106     def get_jvm_memory(self, connection):
107         return self._get_mbean_attr(connection, "Memory")
108
109     def get_jvm_classloading(self, connection):
110         return self._get_mbean_attr(connection, "ClassLoading")
111
112     def get_jvm_threading(self, connection):
113         return self._get_mbean_attr(connection, "Threading")
114
115     def get_jvm_garbagecollector(self, connection):
116         return self._get_mbean_attr(connection, "GarbageCollector")
117
118     def get_jvm_operatingsystem(self, connection):
119         return self._get_mbean_attr(connection, "OperatingSystem")
120
121     def cleanse_string(self, s):
122         return str(s).replace("'", "")
123
124     def plot_points(
125         self, connection, title, filename, metric, submetric, submetrickey=None
126     ):
127
128         from matplotlib import dates, pyplot as plt, ticker as tkr
129
130         metric = self.cleanse_string(metric)
131         submetric = self.cleanse_string(submetric)
132         if submetrickey is not None:
133             submetrickey = self.cleanse_string(submetrickey)
134
135         points = self._get_plot_points(connection, metric, submetric, submetrickey)
136         points[0] = [p.replace(microsecond=0) for p in points[0]]
137         myFmt = dates.DateFormatter("%m-%d %H:%M:%S")
138         fig, ax = plt.subplots()
139
140         ax.plot_date(points[0], points[1], "c-")
141         ax.grid(color="grey")
142         ax.patch.set_facecolor("black")
143         ax.xaxis.set_major_formatter(myFmt)
144
145         axes = plt.gca()
146         axes.get_yaxis().get_major_formatter().set_scientific(False)
147         axes.get_yaxis().get_major_formatter().set_useOffset(False)
148
149         ax.set_xlabel("Time")
150         xlabel = self._convert(submetric).title()
151         if submetrickey is not None:
152             xlabel = xlabel + " : " + str(submetrickey).title()
153         ax.set_ylabel(xlabel)
154
155         mx = max(points[1]) + max(points[1]) * 0.00001
156         mn = min(points[1]) - min(points[1]) * 0.00001
157         ax.set_ylim(mn, mx)
158
159         ax.set_title(title)
160         if isinstance(points[1][0], int):
161             axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _: int(x)))
162         else:
163             axes.yaxis.set_major_formatter(tkr.FuncFormatter(lambda x, _: float(x)))
164         plt.gcf().autofmt_xdate()
165         plt.savefig(filename, bbox_inches="tight")
166
167     def _convert(self, name):
168         s1 = re.sub("(.)([A-Z][a-z]+)", r"\1 \2", name)
169         return re.sub("([a-z0-9])([A-Z])", r"\1 \2", s1).lower()
170
171     def _get_y_val(self, response, metric, submetric=None):
172         if isinstance(response[metric], dict):
173             return response[metric][submetric]
174         else:
175             return response[metric]
176
177     def _get_plot_points(self, connection, metric, submetric, submetrickey=None):
178         indices = self._get_index(connection, need_all=True)
179         points = []
180         for index in indices:
181             responses = self._get_all_mbean_attr(connection, metric, index)
182             for response in responses:
183                 point = (
184                     self._get_datetime_object(response["@timestamp"]),
185                     self._get_y_val(response, submetric, submetrickey),
186                 )
187                 points.append(point)
188         points.sort(key=itemgetter(0))
189         return zip(*points)
190
191     def _get_index(self, connection, need_all=False):
192         indices = sorted(
193             [
194                 i
195                 for i in connection.indices.get_mapping().keys()
196                 if i.startswith("karaf")
197             ]
198         )
199         if need_all:
200             return indices
201         else:
202             return sorted(indices, reverse=True)[0]
203
204     def _get_connection(self, host, port):
205         con_obj = {"host": host, "port": port}
206         es = Elasticsearch([con_obj])
207         return es
208
209     def _get_all_mbean_attr(self, connection, mbean, index, dsl_class="match"):
210         s = (
211             Search(using=connection, index=index)
212             .query(dsl_class, ObjectName=mbean)
213             .sort({"@timestamp": {"order": "desc"}})
214         )
215         response = []
216         for hit in s.scan():
217             response.append(self._get_attr_obj([hit], mbean))
218         return response
219
220     def _get_mbean_attr(self, connection, mbean, dsl_class="match"):
221         index = self._get_index(connection)
222
223         try:
224             s = (
225                 Search(using=connection, index=index)
226                 .query(dsl_class, ObjectName=mbean)
227                 .sort({"@timestamp": {"order": "desc"}})[0]
228                 .execute()
229             )
230         except Exception:
231             raise MBeanNotFoundError("Could Not Fetch %s mbean" % mbean)
232
233         mem_attr = self._get_attr_obj(s, mbean)
234         return mem_attr
235
236     def _get_attr_obj(self, response, mbean):
237         mbean_attr = {}
238         for r in response:
239             for k in self.attr[mbean]:
240                 is_to_dict = getattr(r[k], "to_dict", None)
241                 if callable(is_to_dict):
242                     mbean_attr[k] = r[k].to_dict()
243                 else:
244                     mbean_attr[k] = r[k]
245         return mbean_attr
246
247     def _get_datetime_object(self, timestamp):
248         return datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S,%fZ")