2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.common.actor;
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 import static java.util.concurrent.TimeUnit.NANOSECONDS;
13 import com.google.common.annotations.Beta;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.base.Stopwatch;
18 import com.google.common.base.Ticker;
19 import com.google.common.collect.ImmutableList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import javax.annotation.concurrent.NotThreadSafe;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
28 * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
29 * received between the arrival of two instances of the same message and the amount of time it took to process each
32 * Usage of the API is as follows,
35 * // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
36 * MessageTracker tracker = new MessageTracker(Foo.class, 10);
38 * // Begin the tracking process. If this is not called then calling received and done on the resultant Context
44 * try (MessageTracker.Context context = tracker.received(message)) {
46 * if (context.error().isPresent()){
47 * LOG.error("{}", context.error().get());
50 * // Some custom processing
58 public final class MessageTracker {
59 public static abstract class Context implements AutoCloseable {
61 // Hidden to prevent outside instantiation
64 public abstract Optional<Error> error();
67 public abstract void close();
70 public interface Error {
71 Object getLastExpectedMessage();
72 Object getCurrentExpectedMessage();
73 List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
77 public static final class MessageProcessingTime {
78 private final Class<?> messageClass;
79 private final long elapsedTimeInNanos;
81 MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
82 this.messageClass = Preconditions.checkNotNull(messageClass);
83 this.elapsedTimeInNanos = elapsedTimeInNanos;
87 public String toString() {
88 return "MessageProcessingTime{" +
89 "messageClass=" + messageClass.getSimpleName() +
90 ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) +
94 public Class<?> getMessageClass() {
98 public long getElapsedTimeInNanos() {
99 return elapsedTimeInNanos;
103 private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
104 private static final Context NO_OP_CONTEXT = new Context() {
106 public void close() {
111 public Optional<Error> error() {
112 return Optional.absent();
116 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
118 private final CurrentMessageContext currentMessageContext;
120 private final Stopwatch expectedMessageWatch;
122 private final Class<?> expectedMessageClass;
124 private final long expectedArrivalInterval;
126 private final Ticker ticker;
128 private Object lastExpectedMessage;
131 MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
132 final Ticker ticker) {
133 Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
134 this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
135 this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
136 this.ticker = Preconditions.checkNotNull(ticker);
137 this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
138 this.currentMessageContext = new CurrentMessageContext();
143 * @param expectedMessageClass The class of the message to track
144 * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
147 public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
148 this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
151 public void begin() {
152 if (!expectedMessageWatch.isRunning()) {
153 LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
154 expectedMessageWatch.start();
158 public Context received(final Object message) {
159 if (!expectedMessageWatch.isRunning()) {
160 return NO_OP_CONTEXT;
163 if (expectedMessageClass.isInstance(message)) {
164 final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
165 if (actualElapsedTime > expectedArrivalInterval) {
166 return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
167 messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
169 lastExpectedMessage = message;
170 messagesSinceLastExpectedMessage.clear();
171 expectedMessageWatch.reset().start();
174 currentMessageContext.reset(message);
175 return currentMessageContext;
178 void processed(final Object message, final long messageElapseTimeInNanos) {
179 if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
180 messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
181 messageElapseTimeInNanos));
185 public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
186 return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
189 private static final class FailedExpectation implements Error {
190 private final Object lastExpectedMessage;
191 private final Object currentExpectedMessage;
192 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
193 private final long expectedTimeInMillis;
194 private final long actualTimeInMillis;
196 FailedExpectation(final Object lastExpectedMessage, final Object message,
197 final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
198 final long actualTimeNanos) {
199 this.lastExpectedMessage = lastExpectedMessage;
200 this.currentExpectedMessage = message;
201 this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
202 this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
203 this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
207 public Object getLastExpectedMessage() {
208 return lastExpectedMessage;
212 public Object getCurrentExpectedMessage() {
213 return currentExpectedMessage;
217 public List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage() {
218 return messagesSinceLastExpectedMessage;
222 public String toString() {
223 StringBuilder builder = new StringBuilder();
224 builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
225 builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
226 builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
227 builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
228 for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
229 builder.append("\n\t> ").append(time);
231 return builder.toString();
235 private abstract class AbstractTimedContext extends Context {
236 abstract Object message();
237 abstract Stopwatch stopTimer();
240 public final void close() {
241 processed(message(), stopTimer().elapsed(NANOSECONDS));
245 private final class CurrentMessageContext extends AbstractTimedContext {
246 private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
247 private Object message;
249 void reset(final Object message) {
250 this.message = Preconditions.checkNotNull(message);
251 Preconditions.checkState(!stopwatch.isRunning(),
252 "Trying to reset a context that is not done (%s). currentMessage = %s", this, message);
262 Stopwatch stopTimer() {
263 return stopwatch.stop();
267 public Optional<Error> error() {
268 return Optional.absent();
272 private final class ErrorContext extends AbstractTimedContext {
273 private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
274 private final Object message;
275 private final Error error;
277 ErrorContext(final Object message, final Error error) {
278 this.message = Preconditions.checkNotNull(message);
279 this.error = Preconditions.checkNotNull(error);
288 Stopwatch stopTimer() {
289 return stopwatch.stop();
293 public Optional<Error> error() {
294 return Optional.of(error);