65cef56a2bb9624a7b30d847b5932d1c79f82d14
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / MessageTracker.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.common.actor;
10
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 import static java.util.concurrent.TimeUnit.NANOSECONDS;
13
14 import com.google.common.annotations.Beta;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import com.google.common.base.Stopwatch;
19 import com.google.common.base.Ticker;
20 import com.google.common.collect.ImmutableList;
21 import java.util.LinkedList;
22 import java.util.List;
23 import javax.annotation.concurrent.NotThreadSafe;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
29  * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
30  * received between the arrival of two instances of the same message and the amount of time it took to process each
31  * of those messages.
32  * <br>
33  * Usage of the API is as follows,
34  * <pre>
35  *
36  *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
37  *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
38  *
39  *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
40  *     // will do nothing
41  *     tracker.begin();
42  *
43  *     .....
44  *
45  *     try (MessageTracker.Context context = tracker.received(message)) {
46  *
47  *         if (context.error().isPresent()){
48  *             LOG.error("{}", context.error().get());
49  *         }
50  *
51  *         // Some custom processing
52  *         process(message);
53  *     }
54  *
55  * </pre>
56  */
57 @Beta
58 @NotThreadSafe
59 public final class MessageTracker {
60     public abstract static class Context implements AutoCloseable {
61         Context() {
62             // Hidden to prevent outside instantiation
63         }
64
65         public abstract Optional<Error> error();
66
67         @Override
68         public abstract void close();
69     }
70
71     public interface Error {
72         Object getLastExpectedMessage();
73
74         Object getCurrentExpectedMessage();
75
76         List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
77     }
78
79
80     public static final class MessageProcessingTime {
81         private final Class<?> messageClass;
82         private final long elapsedTimeInNanos;
83
84         MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
85             this.messageClass = Preconditions.checkNotNull(messageClass);
86             this.elapsedTimeInNanos = elapsedTimeInNanos;
87         }
88
89         @Override
90         public String toString() {
91             return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis="
92                    + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]";
93         }
94
95
96         public Class<?> getMessageClass() {
97             return messageClass;
98         }
99
100         public long getElapsedTimeInNanos() {
101             return elapsedTimeInNanos;
102         }
103     }
104
105     private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
106     private static final Context NO_OP_CONTEXT = new Context() {
107         @Override
108         public void close() {
109             // No-op
110         }
111
112         @Override
113         public Optional<Error> error() {
114             return Optional.absent();
115         }
116     };
117
118     private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
119
120     private final CurrentMessageContext currentMessageContext;
121
122     private final Stopwatch expectedMessageWatch;
123
124     private final Class<?> expectedMessageClass;
125
126     private final long expectedArrivalInterval;
127
128     private final Ticker ticker;
129
130     private Object lastExpectedMessage;
131
132     @VisibleForTesting
133     MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
134             final Ticker ticker) {
135         Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
136         this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
137         this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
138         this.ticker = Preconditions.checkNotNull(ticker);
139         this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
140         this.currentMessageContext = new CurrentMessageContext();
141     }
142
143     /**
144      * Constructs an instance.
145      *
146      * @param expectedMessageClass the class of the message to track
147      * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected
148      *                                        message
149      */
150     public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
151         this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
152     }
153
154     public void begin() {
155         if (!expectedMessageWatch.isRunning()) {
156             LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
157             expectedMessageWatch.start();
158         }
159     }
160
161     public Context received(final Object message) {
162         if (!expectedMessageWatch.isRunning()) {
163             return NO_OP_CONTEXT;
164         }
165
166         if (expectedMessageClass.isInstance(message)) {
167             final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
168             if (actualElapsedTime > expectedArrivalInterval) {
169                 return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
170                         messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
171             }
172             lastExpectedMessage = message;
173             messagesSinceLastExpectedMessage.clear();
174             expectedMessageWatch.reset().start();
175         }
176
177         currentMessageContext.reset(message);
178         return currentMessageContext;
179     }
180
181     void processed(final Object message, final long messageElapseTimeInNanos) {
182         if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
183             messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
184                 messageElapseTimeInNanos));
185         }
186     }
187
188     public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
189         return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
190     }
191
192     private static final class FailedExpectation implements Error {
193         private final Object lastExpectedMessage;
194         private final Object currentExpectedMessage;
195         private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
196         private final long expectedTimeInMillis;
197         private final long actualTimeInMillis;
198
199         FailedExpectation(final Object lastExpectedMessage, final Object message,
200                 final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
201                 final long actualTimeNanos) {
202             this.lastExpectedMessage = lastExpectedMessage;
203             this.currentExpectedMessage = message;
204             this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
205             this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
206             this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
207         }
208
209         @Override
210         public Object getLastExpectedMessage() {
211             return lastExpectedMessage;
212         }
213
214         @Override
215         public Object getCurrentExpectedMessage() {
216             return currentExpectedMessage;
217         }
218
219         @Override
220         public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
221             return messagesSinceLastExpectedMessage;
222         }
223
224         @Override
225         public String toString() {
226             StringBuilder builder = new StringBuilder();
227             builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
228             builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
229             builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
230             builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
231             for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
232                 builder.append("\n\t> ").append(time);
233             }
234             return builder.toString();
235         }
236     }
237
238     private abstract class AbstractTimedContext extends Context {
239         abstract Object message();
240
241         abstract Stopwatch stopTimer();
242
243         @Override
244         public final void close() {
245             processed(message(), stopTimer().elapsed(NANOSECONDS));
246         }
247     }
248
249     private final class CurrentMessageContext extends AbstractTimedContext {
250         private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
251         private Object message;
252
253         void reset(final Object newMessage) {
254             this.message = Preconditions.checkNotNull(newMessage);
255             Preconditions.checkState(!stopwatch.isRunning(),
256                 "Trying to reset a context that is not done (%s). currentMessage = %s", this, newMessage);
257             stopwatch.start();
258         }
259
260         @Override
261         Object message() {
262             return message;
263         }
264
265         @Override
266         Stopwatch stopTimer() {
267             return stopwatch.stop();
268         }
269
270         @Override
271         public Optional<Error> error() {
272             return Optional.absent();
273         }
274     }
275
276     private final class ErrorContext extends AbstractTimedContext {
277         private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
278         private final Object message;
279         private final Error error;
280
281         ErrorContext(final Object message, final Error error) {
282             this.message = Preconditions.checkNotNull(message);
283             this.error = Preconditions.checkNotNull(error);
284         }
285
286         @Override
287         Object message() {
288             return message;
289         }
290
291         @Override
292         Stopwatch stopTimer() {
293             return stopwatch.stop();
294         }
295
296         @Override
297         public Optional<Error> error() {
298             return Optional.of(error);
299         }
300     }
301 }