2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.common.actor;
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 import static java.util.concurrent.TimeUnit.NANOSECONDS;
13 import com.google.common.annotations.Beta;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.base.Stopwatch;
18 import com.google.common.base.Ticker;
19 import com.google.common.collect.ImmutableList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import javax.annotation.concurrent.NotThreadSafe;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
28 * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
29 * received between the arrival of two instances of the same message and the amount of time it took to process each
32 * Usage of the API is as follows,
35 * // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
36 * MessageTracker tracker = new MessageTracker(Foo.class, 10);
38 * // Begin the tracking process. If this is not called then calling received and done on the resultant Context
44 * MessageTracker.Context context = tracker.received(message);
46 * if(context.error().isPresent()){
47 * LOG.error("{}", context.error().get());
50 * // Some custom processing
59 public final class MessageTracker {
60 public static abstract class Context {
62 // Hidden to prevent outside instantiation
65 public abstract Context done();
66 public abstract Optional<Error> error();
69 public interface Error {
70 Object getLastExpectedMessage();
71 Object getCurrentExpectedMessage();
72 List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
76 public static final class MessageProcessingTime {
77 private final Class<?> messageClass;
78 private final long elapsedTimeInNanos;
80 MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
81 this.messageClass = Preconditions.checkNotNull(messageClass);
82 this.elapsedTimeInNanos = elapsedTimeInNanos;
86 public String toString() {
87 return "MessageProcessingTime{" +
88 "messageClass=" + messageClass.getSimpleName() +
89 ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) +
93 public Class<?> getMessageClass() {
97 public long getElapsedTimeInNanos() {
98 return elapsedTimeInNanos;
102 private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
103 private static final Context NO_OP_CONTEXT = new Context() {
105 public Context done() {
110 public Optional<Error> error() {
111 return Optional.absent();
115 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
117 private final CurrentMessageContext currentMessageContext;
119 private final Stopwatch expectedMessageWatch;
121 private final Class<?> expectedMessageClass;
123 private final long expectedArrivalInterval;
125 private final Ticker ticker;
127 private Object lastExpectedMessage;
130 MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
131 final Ticker ticker) {
132 Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
133 this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
134 this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
135 this.ticker = Preconditions.checkNotNull(ticker);
136 this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
137 this.currentMessageContext = new CurrentMessageContext();
142 * @param expectedMessageClass The class of the message to track
143 * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
146 public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
147 this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
150 public void begin() {
151 if (!expectedMessageWatch.isRunning()) {
152 LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
153 expectedMessageWatch.start();
157 public Context received(final Object message) {
158 if (!expectedMessageWatch.isRunning()) {
159 return NO_OP_CONTEXT;
162 if (expectedMessageClass.isInstance(message)) {
163 final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
164 if (actualElapsedTime > expectedArrivalInterval) {
165 return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
166 messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
168 lastExpectedMessage = message;
169 messagesSinceLastExpectedMessage.clear();
170 expectedMessageWatch.reset().start();
173 currentMessageContext.reset(message);
174 return currentMessageContext;
177 void processed(final Object message, final long messageElapseTimeInNanos) {
178 if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
179 messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
180 messageElapseTimeInNanos));
184 public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
185 return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
188 private static final class FailedExpectation implements Error {
189 private final Object lastExpectedMessage;
190 private final Object currentExpectedMessage;
191 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
192 private final long expectedTimeInMillis;
193 private final long actualTimeInMillis;
195 FailedExpectation(final Object lastExpectedMessage, final Object message,
196 final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
197 final long actualTimeNanos) {
198 this.lastExpectedMessage = lastExpectedMessage;
199 this.currentExpectedMessage = message;
200 this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
201 this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
202 this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
206 public Object getLastExpectedMessage() {
207 return lastExpectedMessage;
211 public Object getCurrentExpectedMessage() {
212 return currentExpectedMessage;
216 public List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage() {
217 return messagesSinceLastExpectedMessage;
221 public String toString() {
222 StringBuilder builder = new StringBuilder();
223 builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
224 builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
225 builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
226 builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
227 for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
228 builder.append("\n\t> ").append(time);
230 return builder.toString();
234 private abstract class AbstractTimedContext extends Context {
235 abstract Object message();
236 abstract Stopwatch stopTimer();
239 public final Context done() {
240 processed(message(), stopTimer().elapsed(NANOSECONDS));
245 private final class CurrentMessageContext extends AbstractTimedContext {
246 private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
247 private Object message;
249 void reset(final Object message) {
250 this.message = Preconditions.checkNotNull(message);
251 Preconditions.checkState(!stopwatch.isRunning(),
252 "Trying to reset a context that is not done (%s). currentMessage = %s", this, message);
262 Stopwatch stopTimer() {
263 return stopwatch.stop();
267 public Optional<Error> error() {
268 return Optional.absent();
272 private final class ErrorContext extends AbstractTimedContext {
273 private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
274 private final Object message;
275 private final Error error;
277 ErrorContext(final Object message, final Error error) {
278 this.message = Preconditions.checkNotNull(message);
279 this.error = Preconditions.checkNotNull(error);
288 Stopwatch stopTimer() {
289 return stopwatch.stop();
293 public Optional<Error> error() {
294 return Optional.of(error);