Bump odlparent to 5.0.0
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / MessageTracker.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.common.actor;
10
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 import static java.util.concurrent.TimeUnit.NANOSECONDS;
13
14 import com.google.common.annotations.Beta;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import com.google.common.base.Stopwatch;
19 import com.google.common.base.Ticker;
20 import com.google.common.collect.ImmutableList;
21 import java.util.LinkedList;
22 import java.util.List;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
28  * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
29  * received between the arrival of two instances of the same message and the amount of time it took to process each
30  * of those messages.
31  * <br>
32  * Usage of the API is as follows,
33  * <pre>
34  *
35  *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
36  *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
37  *
38  *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
39  *     // will do nothing
40  *     tracker.begin();
41  *
42  *     .....
43  *
44  *     try (MessageTracker.Context context = tracker.received(message)) {
45  *
46  *         if (context.error().isPresent()){
47  *             LOG.error("{}", context.error().get());
48  *         }
49  *
50  *         // Some custom processing
51  *         process(message);
52  *     }
53  *
54  * </pre>
55  *
56  * <p>
57  * This class is NOT thread-safe.
58  */
59 @Beta
60 public final class MessageTracker {
61     public abstract static class Context implements AutoCloseable {
62         Context() {
63             // Hidden to prevent outside instantiation
64         }
65
66         public abstract Optional<Error> error();
67
68         @Override
69         public abstract void close();
70     }
71
72     public interface Error {
73         Object getLastExpectedMessage();
74
75         Object getCurrentExpectedMessage();
76
77         List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
78     }
79
80
81     public static final class MessageProcessingTime {
82         private final Class<?> messageClass;
83         private final long elapsedTimeInNanos;
84
85         MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
86             this.messageClass = Preconditions.checkNotNull(messageClass);
87             this.elapsedTimeInNanos = elapsedTimeInNanos;
88         }
89
90         @Override
91         public String toString() {
92             return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis="
93                    + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]";
94         }
95
96
97         public Class<?> getMessageClass() {
98             return messageClass;
99         }
100
101         public long getElapsedTimeInNanos() {
102             return elapsedTimeInNanos;
103         }
104     }
105
106     private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
107     private static final Context NO_OP_CONTEXT = new Context() {
108         @Override
109         public void close() {
110             // No-op
111         }
112
113         @Override
114         public Optional<Error> error() {
115             return Optional.absent();
116         }
117     };
118
119     private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
120
121     private final CurrentMessageContext currentMessageContext;
122
123     private final Stopwatch expectedMessageWatch;
124
125     private final Class<?> expectedMessageClass;
126
127     private final long expectedArrivalInterval;
128
129     private final Ticker ticker;
130
131     private Object lastExpectedMessage;
132
133     @VisibleForTesting
134     MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
135             final Ticker ticker) {
136         Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
137         this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
138         this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
139         this.ticker = Preconditions.checkNotNull(ticker);
140         this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
141         this.currentMessageContext = new CurrentMessageContext();
142     }
143
144     /**
145      * Constructs an instance.
146      *
147      * @param expectedMessageClass the class of the message to track
148      * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected
149      *                                        message
150      */
151     public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
152         this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
153     }
154
155     public void begin() {
156         if (!expectedMessageWatch.isRunning()) {
157             LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
158             expectedMessageWatch.start();
159         }
160     }
161
162     public Context received(final Object message) {
163         if (!expectedMessageWatch.isRunning()) {
164             return NO_OP_CONTEXT;
165         }
166
167         if (expectedMessageClass.isInstance(message)) {
168             final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
169             if (actualElapsedTime > expectedArrivalInterval) {
170                 return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
171                         messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
172             }
173             lastExpectedMessage = message;
174             messagesSinceLastExpectedMessage.clear();
175             expectedMessageWatch.reset().start();
176         }
177
178         currentMessageContext.reset(message);
179         return currentMessageContext;
180     }
181
182     void processed(final Object message, final long messageElapseTimeInNanos) {
183         if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
184             messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
185                 messageElapseTimeInNanos));
186         }
187     }
188
189     public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
190         return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
191     }
192
193     private static final class FailedExpectation implements Error {
194         private final Object lastExpectedMessage;
195         private final Object currentExpectedMessage;
196         private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
197         private final long expectedTimeInMillis;
198         private final long actualTimeInMillis;
199
200         FailedExpectation(final Object lastExpectedMessage, final Object message,
201                 final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
202                 final long actualTimeNanos) {
203             this.lastExpectedMessage = lastExpectedMessage;
204             this.currentExpectedMessage = message;
205             this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
206             this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
207             this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
208         }
209
210         @Override
211         public Object getLastExpectedMessage() {
212             return lastExpectedMessage;
213         }
214
215         @Override
216         public Object getCurrentExpectedMessage() {
217             return currentExpectedMessage;
218         }
219
220         @Override
221         public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
222             return messagesSinceLastExpectedMessage;
223         }
224
225         @Override
226         public String toString() {
227             StringBuilder builder = new StringBuilder();
228             builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
229             builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
230             builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
231             builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
232             for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
233                 builder.append("\n\t> ").append(time);
234             }
235             return builder.toString();
236         }
237     }
238
239     private abstract class AbstractTimedContext extends Context {
240         abstract Object message();
241
242         abstract Stopwatch stopTimer();
243
244         @Override
245         public final void close() {
246             processed(message(), stopTimer().elapsed(NANOSECONDS));
247         }
248     }
249
250     private final class CurrentMessageContext extends AbstractTimedContext {
251         private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
252         private Object message;
253
254         void reset(final Object newMessage) {
255             this.message = Preconditions.checkNotNull(newMessage);
256             Preconditions.checkState(!stopwatch.isRunning(),
257                 "Trying to reset a context that is not done (%s). currentMessage = %s", this, newMessage);
258             stopwatch.start();
259         }
260
261         @Override
262         Object message() {
263             return message;
264         }
265
266         @Override
267         Stopwatch stopTimer() {
268             return stopwatch.stop();
269         }
270
271         @Override
272         public Optional<Error> error() {
273             return Optional.absent();
274         }
275     }
276
277     private final class ErrorContext extends AbstractTimedContext {
278         private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
279         private final Object message;
280         private final Error error;
281
282         ErrorContext(final Object message, final Error error) {
283             this.message = Preconditions.checkNotNull(message);
284             this.error = Preconditions.checkNotNull(error);
285         }
286
287         @Override
288         Object message() {
289             return message;
290         }
291
292         @Override
293         Stopwatch stopTimer() {
294             return stopwatch.stop();
295         }
296
297         @Override
298         public Optional<Error> error() {
299             return Optional.of(error);
300         }
301     }
302 }