08516252fd601608f661f44293efe105de914033
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / MessageTracker.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.common.actor;
10
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 import static java.util.concurrent.TimeUnit.NANOSECONDS;
13 import com.google.common.annotations.Beta;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.base.Stopwatch;
18 import com.google.common.base.Ticker;
19 import com.google.common.collect.ImmutableList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import javax.annotation.concurrent.NotThreadSafe;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
28  * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
29  * received between the arrival of two instances of the same message and the amount of time it took to process each
30  * of those messages.
31  * <br/>
32  * Usage of the API is as follows,
33  * <pre>
34  *
35  *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
36  *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
37  *
38  *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
39  *     // will do nothing
40  *     tracker.begin();
41  *
42  *     .....
43  *
44  *     MessageTracker.Context context = tracker.received(message);
45  *
46  *     if(context.error().isPresent()){
47  *         LOG.error("{}", context.error().get());
48  *     }
49  *
50  *     // Some custom processing
51  *     process(message);
52  *
53  *     context.done();
54  *
55  * </pre>
56  */
57 @Beta
58 @NotThreadSafe
59 public final class MessageTracker {
60     public static abstract class Context {
61         Context() {
62             // Hidden to prevent outside instantiation
63         }
64
65         public abstract Context done();
66         public abstract Optional<Error> error();
67     }
68
69     public interface Error {
70         Object getLastExpectedMessage();
71         Object getCurrentExpectedMessage();
72         List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
73     }
74
75
76     public static final class MessageProcessingTime {
77         private final Class<?> messageClass;
78         private final long elapsedTimeInNanos;
79
80         MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
81             this.messageClass = Preconditions.checkNotNull(messageClass);
82             this.elapsedTimeInNanos = elapsedTimeInNanos;
83         }
84
85         @Override
86         public String toString() {
87             return "MessageProcessingTime{" +
88                     "messageClass=" + messageClass.getSimpleName() +
89                     ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) +
90                     '}';
91         }
92
93         public Class<?> getMessageClass() {
94             return messageClass;
95         }
96
97         public long getElapsedTimeInNanos() {
98             return elapsedTimeInNanos;
99         }
100     }
101
102     private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
103     private static final Context NO_OP_CONTEXT = new Context() {
104         @Override
105         public Context done() {
106             return this;
107         }
108
109         @Override
110         public Optional<Error> error() {
111             return Optional.absent();
112         }
113     };
114
115     private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
116
117     private final CurrentMessageContext currentMessageContext;
118
119     private final Stopwatch expectedMessageWatch;
120
121     private final Class<?> expectedMessageClass;
122
123     private final long expectedArrivalInterval;
124
125     private final Ticker ticker;
126
127     private Object lastExpectedMessage;
128
129     @VisibleForTesting
130     MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
131             final Ticker ticker) {
132         Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
133         this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
134         this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
135         this.ticker = Preconditions.checkNotNull(ticker);
136         this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
137         this.currentMessageContext = new CurrentMessageContext();
138     }
139
140     /**
141      *
142      * @param expectedMessageClass The class of the message to track
143      * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
144      *                                        message
145      */
146     public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
147         this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
148     }
149
150     public void begin() {
151         if (!expectedMessageWatch.isRunning()) {
152             LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
153             expectedMessageWatch.start();
154         }
155     }
156
157     public Context received(final Object message) {
158         if (!expectedMessageWatch.isRunning()) {
159             return NO_OP_CONTEXT;
160         }
161
162         if (expectedMessageClass.isInstance(message)) {
163             final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
164             if (actualElapsedTime > expectedArrivalInterval) {
165                 return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
166                         messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
167             }
168             lastExpectedMessage = message;
169             messagesSinceLastExpectedMessage.clear();
170             expectedMessageWatch.reset().start();
171         }
172
173         currentMessageContext.reset(message);
174         return currentMessageContext;
175     }
176
177     void processed(final Object message, final long messageElapseTimeInNanos) {
178         if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
179             messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
180                 messageElapseTimeInNanos));
181         }
182     }
183
184     public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
185         return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
186     }
187
188     private static final class FailedExpectation implements Error {
189         private final Object lastExpectedMessage;
190         private final Object currentExpectedMessage;
191         private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
192         private final long expectedTimeInMillis;
193         private final long actualTimeInMillis;
194
195         FailedExpectation(final Object lastExpectedMessage, final Object message,
196                 final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
197                 final long actualTimeNanos) {
198             this.lastExpectedMessage = lastExpectedMessage;
199             this.currentExpectedMessage = message;
200             this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
201             this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
202             this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
203         }
204
205         @Override
206         public Object getLastExpectedMessage() {
207             return lastExpectedMessage;
208         }
209
210         @Override
211         public Object getCurrentExpectedMessage() {
212             return currentExpectedMessage;
213         }
214
215         @Override
216         public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
217             return messagesSinceLastExpectedMessage;
218         }
219
220         @Override
221         public String toString() {
222             StringBuilder builder = new StringBuilder();
223             builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
224             builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
225             builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
226             builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
227             for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
228                 builder.append("\n\t> ").append(time);
229             }
230             return builder.toString();
231         }
232     }
233
234     private abstract class AbstractTimedContext extends Context {
235         abstract Object message();
236         abstract Stopwatch stopTimer();
237
238         @Override
239         public final Context done() {
240             processed(message(), stopTimer().elapsed(NANOSECONDS));
241             return this;
242         }
243     }
244
245     private final class CurrentMessageContext extends AbstractTimedContext {
246         private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
247         private Object message;
248
249         void reset(final Object message) {
250             this.message = Preconditions.checkNotNull(message);
251             Preconditions.checkState(!stopwatch.isRunning(),
252                 "Trying to reset a context that is not done (%s). currentMessage = %s", this, message);
253             stopwatch.start();
254         }
255
256         @Override
257         Object message() {
258             return message;
259         }
260
261         @Override
262         Stopwatch stopTimer() {
263             return stopwatch.stop();
264         }
265
266         @Override
267         public Optional<Error> error() {
268             return Optional.absent();
269         }
270     }
271
272     private final class ErrorContext extends AbstractTimedContext {
273         private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
274         private final Object message;
275         private final Error error;
276
277         ErrorContext(final Object message, final Error error) {
278             this.message = Preconditions.checkNotNull(message);
279             this.error = Preconditions.checkNotNull(error);
280         }
281
282         @Override
283         Object message() {
284             return message;
285         }
286
287         @Override
288         Stopwatch stopTimer() {
289             return stopwatch.stop();
290         }
291
292         @Override
293         public Optional<Error> error() {
294             return Optional.of(error);
295         }
296     }
297 }