2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.common.actor;
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 import static java.util.concurrent.TimeUnit.NANOSECONDS;
14 import com.google.common.annotations.Beta;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import com.google.common.base.Stopwatch;
19 import com.google.common.base.Ticker;
20 import com.google.common.collect.ImmutableList;
21 import java.util.LinkedList;
22 import java.util.List;
23 import javax.annotation.concurrent.NotThreadSafe;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
29 * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
30 * received between the arrival of two instances of the same message and the amount of time it took to process each
33 * Usage of the API is as follows,
36 * // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
37 * MessageTracker tracker = new MessageTracker(Foo.class, 10);
39 * // Begin the tracking process. If this is not called then calling received and done on the resultant Context
45 * try (MessageTracker.Context context = tracker.received(message)) {
47 * if (context.error().isPresent()){
48 * LOG.error("{}", context.error().get());
51 * // Some custom processing
59 public final class MessageTracker {
60 public abstract static class Context implements AutoCloseable {
62 // Hidden to prevent outside instantiation
65 public abstract Optional<Error> error();
68 public abstract void close();
71 public interface Error {
72 Object getLastExpectedMessage();
74 Object getCurrentExpectedMessage();
76 List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
80 public static final class MessageProcessingTime {
81 private final Class<?> messageClass;
82 private final long elapsedTimeInNanos;
84 MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
85 this.messageClass = Preconditions.checkNotNull(messageClass);
86 this.elapsedTimeInNanos = elapsedTimeInNanos;
90 public String toString() {
91 return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis="
92 + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]";
96 public Class<?> getMessageClass() {
100 public long getElapsedTimeInNanos() {
101 return elapsedTimeInNanos;
105 private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
106 private static final Context NO_OP_CONTEXT = new Context() {
108 public void close() {
113 public Optional<Error> error() {
114 return Optional.absent();
118 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
120 private final CurrentMessageContext currentMessageContext;
122 private final Stopwatch expectedMessageWatch;
124 private final Class<?> expectedMessageClass;
126 private final long expectedArrivalInterval;
128 private final Ticker ticker;
130 private Object lastExpectedMessage;
133 MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
134 final Ticker ticker) {
135 Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
136 this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
137 this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
138 this.ticker = Preconditions.checkNotNull(ticker);
139 this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
140 this.currentMessageContext = new CurrentMessageContext();
144 * Constructs an instance.
146 * @param expectedMessageClass the class of the message to track
147 * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected
150 public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
151 this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
154 public void begin() {
155 if (!expectedMessageWatch.isRunning()) {
156 LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
157 expectedMessageWatch.start();
161 public Context received(final Object message) {
162 if (!expectedMessageWatch.isRunning()) {
163 return NO_OP_CONTEXT;
166 if (expectedMessageClass.isInstance(message)) {
167 final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
168 if (actualElapsedTime > expectedArrivalInterval) {
169 return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
170 messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
172 lastExpectedMessage = message;
173 messagesSinceLastExpectedMessage.clear();
174 expectedMessageWatch.reset().start();
177 currentMessageContext.reset(message);
178 return currentMessageContext;
181 void processed(final Object message, final long messageElapseTimeInNanos) {
182 if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
183 messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
184 messageElapseTimeInNanos));
188 public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
189 return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
192 private static final class FailedExpectation implements Error {
193 private final Object lastExpectedMessage;
194 private final Object currentExpectedMessage;
195 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
196 private final long expectedTimeInMillis;
197 private final long actualTimeInMillis;
199 FailedExpectation(final Object lastExpectedMessage, final Object message,
200 final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
201 final long actualTimeNanos) {
202 this.lastExpectedMessage = lastExpectedMessage;
203 this.currentExpectedMessage = message;
204 this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
205 this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
206 this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
210 public Object getLastExpectedMessage() {
211 return lastExpectedMessage;
215 public Object getCurrentExpectedMessage() {
216 return currentExpectedMessage;
220 public List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage() {
221 return messagesSinceLastExpectedMessage;
225 public String toString() {
226 StringBuilder builder = new StringBuilder();
227 builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
228 builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
229 builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
230 builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
231 for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
232 builder.append("\n\t> ").append(time);
234 return builder.toString();
238 private abstract class AbstractTimedContext extends Context {
239 abstract Object message();
241 abstract Stopwatch stopTimer();
244 public final void close() {
245 processed(message(), stopTimer().elapsed(NANOSECONDS));
249 private final class CurrentMessageContext extends AbstractTimedContext {
250 private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
251 private Object message;
253 void reset(final Object newMessage) {
254 this.message = Preconditions.checkNotNull(newMessage);
255 Preconditions.checkState(!stopwatch.isRunning(),
256 "Trying to reset a context that is not done (%s). currentMessage = %s", this, newMessage);
266 Stopwatch stopTimer() {
267 return stopwatch.stop();
271 public Optional<Error> error() {
272 return Optional.absent();
276 private final class ErrorContext extends AbstractTimedContext {
277 private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
278 private final Object message;
279 private final Error error;
281 ErrorContext(final Object message, final Error error) {
282 this.message = Preconditions.checkNotNull(message);
283 this.error = Preconditions.checkNotNull(error);
292 Stopwatch stopTimer() {
293 return stopwatch.stop();
297 public Optional<Error> error() {
298 return Optional.of(error);