Make MessageTracker.Context implement AutoCloseable
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / MessageTracker.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.common.actor;
10
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 import static java.util.concurrent.TimeUnit.NANOSECONDS;
13 import com.google.common.annotations.Beta;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.base.Stopwatch;
18 import com.google.common.base.Ticker;
19 import com.google.common.collect.ImmutableList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import javax.annotation.concurrent.NotThreadSafe;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
28  * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
29  * received between the arrival of two instances of the same message and the amount of time it took to process each
30  * of those messages.
31  * <br/>
32  * Usage of the API is as follows,
33  * <pre>
34  *
35  *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
36  *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
37  *
38  *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
39  *     // will do nothing
40  *     tracker.begin();
41  *
42  *     .....
43  *
44  *     try (MessageTracker.Context context = tracker.received(message)) {
45  *
46  *         if (context.error().isPresent()){
47  *             LOG.error("{}", context.error().get());
48  *         }
49  *
50  *         // Some custom processing
51  *         process(message);
52  *     }
53  *
54  * </pre>
55  */
56 @Beta
57 @NotThreadSafe
58 public final class MessageTracker {
59     public static abstract class Context implements AutoCloseable {
60         Context() {
61             // Hidden to prevent outside instantiation
62         }
63
64         public abstract Optional<Error> error();
65
66         @Override
67         public abstract void close();
68     }
69
70     public interface Error {
71         Object getLastExpectedMessage();
72         Object getCurrentExpectedMessage();
73         List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
74     }
75
76
77     public static final class MessageProcessingTime {
78         private final Class<?> messageClass;
79         private final long elapsedTimeInNanos;
80
81         MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
82             this.messageClass = Preconditions.checkNotNull(messageClass);
83             this.elapsedTimeInNanos = elapsedTimeInNanos;
84         }
85
86         @Override
87         public String toString() {
88             return "MessageProcessingTime{" +
89                     "messageClass=" + messageClass.getSimpleName() +
90                     ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) +
91                     '}';
92         }
93
94         public Class<?> getMessageClass() {
95             return messageClass;
96         }
97
98         public long getElapsedTimeInNanos() {
99             return elapsedTimeInNanos;
100         }
101     }
102
103     private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
104     private static final Context NO_OP_CONTEXT = new Context() {
105         @Override
106         public void close() {
107             // No-op
108         }
109
110         @Override
111         public Optional<Error> error() {
112             return Optional.absent();
113         }
114     };
115
116     private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
117
118     private final CurrentMessageContext currentMessageContext;
119
120     private final Stopwatch expectedMessageWatch;
121
122     private final Class<?> expectedMessageClass;
123
124     private final long expectedArrivalInterval;
125
126     private final Ticker ticker;
127
128     private Object lastExpectedMessage;
129
130     @VisibleForTesting
131     MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
132             final Ticker ticker) {
133         Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
134         this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
135         this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
136         this.ticker = Preconditions.checkNotNull(ticker);
137         this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
138         this.currentMessageContext = new CurrentMessageContext();
139     }
140
141     /**
142      *
143      * @param expectedMessageClass The class of the message to track
144      * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
145      *                                        message
146      */
147     public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
148         this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
149     }
150
151     public void begin() {
152         if (!expectedMessageWatch.isRunning()) {
153             LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
154             expectedMessageWatch.start();
155         }
156     }
157
158     public Context received(final Object message) {
159         if (!expectedMessageWatch.isRunning()) {
160             return NO_OP_CONTEXT;
161         }
162
163         if (expectedMessageClass.isInstance(message)) {
164             final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
165             if (actualElapsedTime > expectedArrivalInterval) {
166                 return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
167                         messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
168             }
169             lastExpectedMessage = message;
170             messagesSinceLastExpectedMessage.clear();
171             expectedMessageWatch.reset().start();
172         }
173
174         currentMessageContext.reset(message);
175         return currentMessageContext;
176     }
177
178     void processed(final Object message, final long messageElapseTimeInNanos) {
179         if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
180             messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
181                 messageElapseTimeInNanos));
182         }
183     }
184
185     public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
186         return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
187     }
188
189     private static final class FailedExpectation implements Error {
190         private final Object lastExpectedMessage;
191         private final Object currentExpectedMessage;
192         private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
193         private final long expectedTimeInMillis;
194         private final long actualTimeInMillis;
195
196         FailedExpectation(final Object lastExpectedMessage, final Object message,
197                 final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
198                 final long actualTimeNanos) {
199             this.lastExpectedMessage = lastExpectedMessage;
200             this.currentExpectedMessage = message;
201             this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
202             this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
203             this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
204         }
205
206         @Override
207         public Object getLastExpectedMessage() {
208             return lastExpectedMessage;
209         }
210
211         @Override
212         public Object getCurrentExpectedMessage() {
213             return currentExpectedMessage;
214         }
215
216         @Override
217         public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
218             return messagesSinceLastExpectedMessage;
219         }
220
221         @Override
222         public String toString() {
223             StringBuilder builder = new StringBuilder();
224             builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
225             builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
226             builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
227             builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
228             for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
229                 builder.append("\n\t> ").append(time);
230             }
231             return builder.toString();
232         }
233     }
234
235     private abstract class AbstractTimedContext extends Context {
236         abstract Object message();
237         abstract Stopwatch stopTimer();
238
239         @Override
240         public final void close() {
241             processed(message(), stopTimer().elapsed(NANOSECONDS));
242         }
243     }
244
245     private final class CurrentMessageContext extends AbstractTimedContext {
246         private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
247         private Object message;
248
249         void reset(final Object message) {
250             this.message = Preconditions.checkNotNull(message);
251             Preconditions.checkState(!stopwatch.isRunning(),
252                 "Trying to reset a context that is not done (%s). currentMessage = %s", this, message);
253             stopwatch.start();
254         }
255
256         @Override
257         Object message() {
258             return message;
259         }
260
261         @Override
262         Stopwatch stopTimer() {
263             return stopwatch.stop();
264         }
265
266         @Override
267         public Optional<Error> error() {
268             return Optional.absent();
269         }
270     }
271
272     private final class ErrorContext extends AbstractTimedContext {
273         private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
274         private final Object message;
275         private final Error error;
276
277         ErrorContext(final Object message, final Error error) {
278             this.message = Preconditions.checkNotNull(message);
279             this.error = Preconditions.checkNotNull(error);
280         }
281
282         @Override
283         Object message() {
284             return message;
285         }
286
287         @Override
288         Stopwatch stopTimer() {
289             return stopwatch.stop();
290         }
291
292         @Override
293         public Optional<Error> error() {
294             return Optional.of(error);
295         }
296     }
297 }