package org.opendaylight.controller.cluster.common.actor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
* expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
* received between the arrival of two instances of the same message and the amount of time it took to process each
* of those messages.
- * <br/>
+ * <br>
* Usage of the API is as follows,
* <pre>
*
*
* .....
*
- * MessageTracker.Context context = tracker.received(message);
- *
- * if(context.error().isPresent()){
- * LOG.error("{}", context.error().get());
- * }
+ * try (MessageTracker.Context context = tracker.received(message)) {
*
- * // Some custom processing
- * process(message);
+ * if (context.error().isPresent()){
+ * LOG.error("{}", context.error().get());
+ * }
*
- * context.done();
+ * // Some custom processing
+ * process(message);
+ * }
*
* </pre>
*/
+@Beta
+@NotThreadSafe
public final class MessageTracker {
+ public abstract static class Context implements AutoCloseable {
+ Context() {
+ // Hidden to prevent outside instantiation
+ }
- private static final Context NO_OP_CONTEXT = new NoOpContext();
+ public abstract Optional<Error> error();
- private final Class<?> expectedMessageClass;
+ @Override
+ public abstract void close();
+ }
- private final long expectedArrivalInterval;
+ public interface Error {
+ Object getLastExpectedMessage();
- private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
+ Object getCurrentExpectedMessage();
- private Stopwatch expectedMessageWatch;
+ List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+ }
- private boolean enabled = false;
- private Object lastExpectedMessage;
+ public static final class MessageProcessingTime {
+ private final Class<?> messageClass;
+ private final long elapsedTimeInNanos;
- private Object currentMessage;
+ MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
+ this.messageClass = Preconditions.checkNotNull(messageClass);
+ this.elapsedTimeInNanos = elapsedTimeInNanos;
+ }
- private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
+ @Override
+ public String toString() {
+ return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis="
+ + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]";
+ }
- /**
- *
- * @param expectedMessageClass The class of the message to track
- * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
- * message
- */
- public MessageTracker(Class<?> expectedMessageClass, long expectedArrivalIntervalInMillis){
- this.expectedMessageClass = expectedMessageClass;
- this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
- }
- public void begin(){
- if(enabled) {
- return;
+ public Class<?> getMessageClass() {
+ return messageClass;
}
- enabled = true;
- expectedMessageWatch = Stopwatch.createStarted();
- }
- public Context received(Object message){
- if(!enabled) {
- return NO_OP_CONTEXT;
- }
- this.currentMessage = message;
- if(expectedMessageClass.isInstance(message)){
- long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
- if(actualElapsedTime > expectedArrivalInterval){
- return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
- ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
- actualElapsedTime)));
- }
- this.lastExpectedMessage = message;
- this.messagesSinceLastExpectedMessage.clear();
+ public long getElapsedTimeInNanos() {
+ return elapsedTimeInNanos;
}
-
- currentMessageContext.reset();
- return currentMessageContext;
}
- private void processed(Object message, long messageElapseTimeInNanos){
- if(!enabled) {
- return;
+ private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
+ private static final Context NO_OP_CONTEXT = new Context() {
+ @Override
+ public void close() {
+ // No-op
}
- if(!expectedMessageClass.isInstance(message)){
- this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
+
+ @Override
+ public Optional<Error> error() {
+ return Optional.absent();
}
- }
+ };
+
+ private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
+
+ private final CurrentMessageContext currentMessageContext;
+
+ private final Stopwatch expectedMessageWatch;
+
+ private final Class<?> expectedMessageClass;
+
+ private final long expectedArrivalInterval;
+
+ private final Ticker ticker;
- public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
- return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
+ private Object lastExpectedMessage;
+
+ @VisibleForTesting
+ MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
+ final Ticker ticker) {
+ Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
+ this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
+ this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
+ this.ticker = Preconditions.checkNotNull(ticker);
+ this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
+ this.currentMessageContext = new CurrentMessageContext();
}
- public static class MessageProcessingTime {
- private final Class<?> messageClass;
- private final long elapsedTimeInNanos;
+ /**
+ * Constructs an instance.
+ *
+ * @param expectedMessageClass the class of the message to track
+ * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected
+ * message
+ */
+ public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
+ this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
+ }
- MessageProcessingTime(Class<?> messageClass, long elapsedTimeInNanos){
- this.messageClass = messageClass;
- this.elapsedTimeInNanos = elapsedTimeInNanos;
+ public void begin() {
+ if (!expectedMessageWatch.isRunning()) {
+ LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
+ expectedMessageWatch.start();
}
+ }
- @Override
- public String toString() {
- return "MessageProcessingTime{" +
- "messageClass=" + messageClass.getSimpleName() +
- ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
- '}';
+ public Context received(final Object message) {
+ if (!expectedMessageWatch.isRunning()) {
+ return NO_OP_CONTEXT;
}
- public Class<?> getMessageClass() {
- return messageClass;
+ if (expectedMessageClass.isInstance(message)) {
+ final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
+ if (actualElapsedTime > expectedArrivalInterval) {
+ return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
+ messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
+ }
+ lastExpectedMessage = message;
+ messagesSinceLastExpectedMessage.clear();
+ expectedMessageWatch.reset().start();
}
- public long getElapsedTimeInNanos() {
- return elapsedTimeInNanos;
- }
+ currentMessageContext.reset(message);
+ return currentMessageContext;
}
- public interface Error {
- Object getLastExpectedMessage();
- Object getCurrentExpectedMessage();
- List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+ void processed(final Object message, final long messageElapseTimeInNanos) {
+ if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
+ messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
+ messageElapseTimeInNanos));
+ }
}
- private class FailedExpectation implements Error {
+ public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
+ return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
+ }
+ private static final class FailedExpectation implements Error {
private final Object lastExpectedMessage;
private final Object currentExpectedMessage;
private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
private final long expectedTimeInMillis;
private final long actualTimeInMillis;
- public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
+ FailedExpectation(final Object lastExpectedMessage, final Object message,
+ final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
+ final long actualTimeNanos) {
this.lastExpectedMessage = lastExpectedMessage;
this.currentExpectedMessage = message;
- this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
- this.expectedTimeInMillis = expectedTimeInMillis;
- this.actualTimeInMillis = actualTimeInMillis;
+ this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
+ this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
+ this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
}
+ @Override
public Object getLastExpectedMessage() {
return lastExpectedMessage;
}
+ @Override
public Object getCurrentExpectedMessage() {
return currentExpectedMessage;
}
+ @Override
public List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage() {
return messagesSinceLastExpectedMessage;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("\n> Last Expected Message = " + lastExpectedMessage);
- builder.append("\n> Current Expected Message = " + currentExpectedMessage);
- builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
- builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
+ builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
+ builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
+ builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
+ builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
- builder.append("\n\t> ").append(time.toString());
+ builder.append("\n\t> ").append(time);
}
return builder.toString();
}
-
- }
-
- public interface Context {
- Context done();
- Optional<? extends Error> error();
}
- private static class NoOpContext implements Context {
+ private abstract class AbstractTimedContext extends Context {
+ abstract Object message();
- @Override
- public Context done() {
- return this;
- }
+ abstract Stopwatch stopTimer();
@Override
- public Optional<Error> error() {
- return Optional.absent();
+ public final void close() {
+ processed(message(), stopTimer().elapsed(NANOSECONDS));
}
}
- private class CurrentMessageContext implements Context {
- Stopwatch stopwatch = Stopwatch.createStarted();
- boolean done = true;
+ private final class CurrentMessageContext extends AbstractTimedContext {
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
+ private Object message;
+
+ void reset(final Object newMessage) {
+ this.message = Preconditions.checkNotNull(newMessage);
+ Preconditions.checkState(!stopwatch.isRunning(),
+ "Trying to reset a context that is not done (%s). currentMessage = %s", this, newMessage);
+ stopwatch.start();
+ }
- public void reset(){
- Preconditions.checkState(done,
- String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage));
- done = false;
- stopwatch.reset().start();
+ @Override
+ Object message() {
+ return message;
}
@Override
- public Context done() {
- processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
- done = true;
- return this;
+ Stopwatch stopTimer() {
+ return stopwatch.stop();
}
@Override
- public Optional<? extends Error> error() {
+ public Optional<Error> error() {
return Optional.absent();
}
}
- private class ErrorContext implements Context {
- Object message;
- private final Optional<? extends Error> error;
- Stopwatch stopwatch;
+ private final class ErrorContext extends AbstractTimedContext {
+ private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
+ private final Object message;
+ private final Error error;
+
+ ErrorContext(final Object message, final Error error) {
+ this.message = Preconditions.checkNotNull(message);
+ this.error = Preconditions.checkNotNull(error);
+ }
- ErrorContext(Object message, Optional<? extends Error> error){
- this.message = message;
- this.error = error;
- this.stopwatch = Stopwatch.createStarted();
+ @Override
+ Object message() {
+ return message;
}
@Override
- public Context done(){
- processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
- this.stopwatch.stop();
- return this;
+ Stopwatch stopTimer() {
+ return stopwatch.stop();
}
@Override
- public Optional<? extends Error> error() {
- return error;
+ public Optional<Error> error() {
+ return Optional.of(error);
}
}
}