Convert MessageTracker to use java.util.Optional
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / MessageTracker.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.common.actor;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
13 import static java.util.concurrent.TimeUnit.MILLISECONDS;
14 import static java.util.concurrent.TimeUnit.NANOSECONDS;
15
16 import com.google.common.annotations.Beta;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.base.Stopwatch;
19 import com.google.common.base.Ticker;
20 import com.google.common.collect.ImmutableList;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Optional;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
29  * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
30  * received between the arrival of two instances of the same message and the amount of time it took to process each
31  * of those messages.
32  * <br>
33  * Usage of the API is as follows,
34  * <pre>
35  *
36  *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
37  *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
38  *
39  *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
40  *     // will do nothing
41  *     tracker.begin();
42  *
43  *     .....
44  *
45  *     try (MessageTracker.Context context = tracker.received(message)) {
46  *
47  *         if (context.error().isPresent()){
48  *             LOG.error("{}", context.error().get());
49  *         }
50  *
51  *         // Some custom processing
52  *         process(message);
53  *     }
54  *
55  * </pre>
56  *
57  * <p>
58  * This class is NOT thread-safe.
59  */
60 @Beta
61 public final class MessageTracker {
62     public abstract static class Context implements AutoCloseable {
63         Context() {
64             // Hidden to prevent outside instantiation
65         }
66
67         public abstract Optional<Error> error();
68
69         @Override
70         public abstract void close();
71     }
72
73     public interface Error {
74         Object getLastExpectedMessage();
75
76         Object getCurrentExpectedMessage();
77
78         List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
79     }
80
81
82     public static final class MessageProcessingTime {
83         private final Class<?> messageClass;
84         private final long elapsedTimeInNanos;
85
86         MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
87             this.messageClass = requireNonNull(messageClass);
88             this.elapsedTimeInNanos = elapsedTimeInNanos;
89         }
90
91         @Override
92         public String toString() {
93             return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis="
94                    + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]";
95         }
96
97
98         public Class<?> getMessageClass() {
99             return messageClass;
100         }
101
102         public long getElapsedTimeInNanos() {
103             return elapsedTimeInNanos;
104         }
105     }
106
107     private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
108     private static final Context NO_OP_CONTEXT = new Context() {
109         @Override
110         public void close() {
111             // No-op
112         }
113
114         @Override
115         public Optional<Error> error() {
116             return Optional.empty();
117         }
118     };
119
120     private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
121
122     private final CurrentMessageContext currentMessageContext;
123
124     private final Stopwatch expectedMessageWatch;
125
126     private final Class<?> expectedMessageClass;
127
128     private final long expectedArrivalInterval;
129
130     private final Ticker ticker;
131
132     private Object lastExpectedMessage;
133
134     @VisibleForTesting
135     MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
136             final Ticker ticker) {
137         checkArgument(expectedArrivalIntervalInMillis >= 0);
138         this.expectedMessageClass = requireNonNull(expectedMessageClass);
139         this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
140         this.ticker = requireNonNull(ticker);
141         this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
142         this.currentMessageContext = new CurrentMessageContext();
143     }
144
145     /**
146      * Constructs an instance.
147      *
148      * @param expectedMessageClass the class of the message to track
149      * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected
150      *                                        message
151      */
152     public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
153         this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
154     }
155
156     public void begin() {
157         if (!expectedMessageWatch.isRunning()) {
158             LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
159             expectedMessageWatch.start();
160         }
161     }
162
163     public Context received(final Object message) {
164         if (!expectedMessageWatch.isRunning()) {
165             return NO_OP_CONTEXT;
166         }
167
168         if (expectedMessageClass.isInstance(message)) {
169             final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
170             if (actualElapsedTime > expectedArrivalInterval) {
171                 return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
172                         messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
173             }
174             lastExpectedMessage = message;
175             messagesSinceLastExpectedMessage.clear();
176             expectedMessageWatch.reset().start();
177         }
178
179         currentMessageContext.reset(message);
180         return currentMessageContext;
181     }
182
183     void processed(final Object message, final long messageElapseTimeInNanos) {
184         if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
185             messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
186                 messageElapseTimeInNanos));
187         }
188     }
189
190     public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
191         return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
192     }
193
194     private static final class FailedExpectation implements Error {
195         private final Object lastExpectedMessage;
196         private final Object currentExpectedMessage;
197         private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
198         private final long expectedTimeInMillis;
199         private final long actualTimeInMillis;
200
201         FailedExpectation(final Object lastExpectedMessage, final Object message,
202                 final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
203                 final long actualTimeNanos) {
204             this.lastExpectedMessage = lastExpectedMessage;
205             this.currentExpectedMessage = message;
206             this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
207             this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
208             this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
209         }
210
211         @Override
212         public Object getLastExpectedMessage() {
213             return lastExpectedMessage;
214         }
215
216         @Override
217         public Object getCurrentExpectedMessage() {
218             return currentExpectedMessage;
219         }
220
221         @Override
222         public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
223             return messagesSinceLastExpectedMessage;
224         }
225
226         @Override
227         public String toString() {
228             StringBuilder builder = new StringBuilder()
229                     .append("\n> Last Expected Message = ").append(lastExpectedMessage)
230                     .append("\n> Current Expected Message = ").append(currentExpectedMessage)
231                     .append("\n> Expected time in between messages = ").append(expectedTimeInMillis)
232                     .append("\n> Actual time in between messages = ").append(actualTimeInMillis);
233             for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
234                 builder.append("\n\t> ").append(time);
235             }
236             return builder.toString();
237         }
238     }
239
240     private abstract class AbstractTimedContext extends Context {
241         abstract Object message();
242
243         abstract Stopwatch stopTimer();
244
245         @Override
246         public final void close() {
247             processed(message(), stopTimer().elapsed(NANOSECONDS));
248         }
249     }
250
251     private final class CurrentMessageContext extends AbstractTimedContext {
252         private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
253         private Object message;
254
255         void reset(final Object newMessage) {
256             this.message = requireNonNull(newMessage);
257             checkState(!stopwatch.isRunning(), "Trying to reset a context that is not done (%s). currentMessage = %s",
258                 this, newMessage);
259             stopwatch.start();
260         }
261
262         @Override
263         Object message() {
264             return message;
265         }
266
267         @Override
268         Stopwatch stopTimer() {
269             return stopwatch.stop();
270         }
271
272         @Override
273         public Optional<Error> error() {
274             return Optional.empty();
275         }
276     }
277
278     private final class ErrorContext extends AbstractTimedContext {
279         private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
280         private final Object message;
281         private final Error error;
282
283         ErrorContext(final Object message, final Error error) {
284             this.message = requireNonNull(message);
285             this.error = requireNonNull(error);
286         }
287
288         @Override
289         Object message() {
290             return message;
291         }
292
293         @Override
294         Stopwatch stopTimer() {
295             return stopwatch.stop();
296         }
297
298         @Override
299         public Optional<Error> error() {
300             return Optional.of(error);
301         }
302     }
303 }