X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fcommon%2Factor%2FMessageTracker.java;h=08516252fd601608f661f44293efe105de914033;hb=c59d11dbcee9714bf7739ff12c3f7455a388f369;hp=f046240734762e8611a544526e740bd461378924;hpb=93c17c55ca0c3690d77c96cdfed91b6f186fe56a;p=controller.git diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java index f046240734..08516252fd 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java @@ -8,13 +8,20 @@ package org.opendaylight.controller.cluster.common.actor; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.NotThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was @@ -47,25 +54,88 @@ import java.util.concurrent.TimeUnit; * * */ +@Beta +@NotThreadSafe public final class MessageTracker { + public static abstract class Context { + Context() { + // Hidden to prevent outside instantiation + } - private static final Context NO_OP_CONTEXT = new NoOpContext(); + public abstract Context done(); + public abstract Optional error(); + } - private final Class expectedMessageClass; + public interface Error { + Object getLastExpectedMessage(); + Object getCurrentExpectedMessage(); + List getMessageProcessingTimesSinceLastExpectedMessage(); + } - private final long expectedArrivalInterval; + + public static final class MessageProcessingTime { + private final Class messageClass; + private final long elapsedTimeInNanos; + + MessageProcessingTime(final Class messageClass, final long elapsedTimeInNanos) { + this.messageClass = Preconditions.checkNotNull(messageClass); + this.elapsedTimeInNanos = elapsedTimeInNanos; + } + + @Override + public String toString() { + return "MessageProcessingTime{" + + "messageClass=" + messageClass.getSimpleName() + + ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) + + '}'; + } + + public Class getMessageClass() { + return messageClass; + } + + public long getElapsedTimeInNanos() { + return elapsedTimeInNanos; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class); + private static final Context NO_OP_CONTEXT = new Context() { + @Override + public Context done() { + return this; + } + + @Override + public Optional error() { + return Optional.absent(); + } + }; private final List messagesSinceLastExpectedMessage = new LinkedList<>(); - private Stopwatch expectedMessageWatch; + private final CurrentMessageContext currentMessageContext; - private boolean enabled = false; + private final Stopwatch expectedMessageWatch; - private Object lastExpectedMessage; + private final Class expectedMessageClass; + + private final long expectedArrivalInterval; - private Object currentMessage; + private final Ticker ticker; - private final CurrentMessageContext currentMessageContext = new CurrentMessageContext(); + private Object lastExpectedMessage; + + @VisibleForTesting + MessageTracker(final Class expectedMessageClass, final long expectedArrivalIntervalInMillis, + final Ticker ticker) { + Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0); + this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass); + this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis); + this.ticker = Preconditions.checkNotNull(ticker); + this.expectedMessageWatch = Stopwatch.createUnstarted(ticker); + this.currentMessageContext = new CurrentMessageContext(); + } /** * @@ -73,108 +143,76 @@ public final class MessageTracker { * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected * message */ - public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){ - this.expectedMessageClass = expectedMessageClass; - this.expectedArrivalInterval = expectedArrivalIntervalInMillis; + public MessageTracker(final Class expectedMessageClass, final long expectedArrivalIntervalInMillis) { + this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker()); } - public void begin(){ - if(enabled) { - return; + public void begin() { + if (!expectedMessageWatch.isRunning()) { + LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval); + expectedMessageWatch.start(); } - enabled = true; - expectedMessageWatch = Stopwatch.createStarted(); } - public Context received(Object message){ - if(!enabled) { + public Context received(final Object message) { + if (!expectedMessageWatch.isRunning()) { return NO_OP_CONTEXT; } - this.currentMessage = message; - if(expectedMessageClass.isInstance(message)){ - long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS); - if(actualElapsedTime > expectedArrivalInterval){ - return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message, - ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval, - actualElapsedTime))); + + if (expectedMessageClass.isInstance(message)) { + final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS); + if (actualElapsedTime > expectedArrivalInterval) { + return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message, + messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime)); } - this.lastExpectedMessage = message; - this.messagesSinceLastExpectedMessage.clear(); + lastExpectedMessage = message; + messagesSinceLastExpectedMessage.clear(); + expectedMessageWatch.reset().start(); } - currentMessageContext.reset(); + currentMessageContext.reset(message); return currentMessageContext; } - private void processed(Object message, long messageElapseTimeInNanos){ - if(!enabled) { - return; - } - if(!expectedMessageClass.isInstance(message)){ - this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos)); - } - } - - public List getMessagesSinceLastExpectedMessage(){ - return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage); - } - - public static class MessageProcessingTime { - private final Class messageClass; - private final long elapsedTimeInNanos; - - MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){ - this.messageClass = messageClass; - this.elapsedTimeInNanos = elapsedTimeInNanos; - } - - @Override - public String toString() { - return "MessageProcessingTime{" + - "messageClass=" + messageClass.getSimpleName() + - ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) + - '}'; - } - - public Class getMessageClass() { - return messageClass; - } - - public long getElapsedTimeInNanos() { - return elapsedTimeInNanos; + void processed(final Object message, final long messageElapseTimeInNanos) { + if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) { + messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), + messageElapseTimeInNanos)); } } - public interface Error { - Object getLastExpectedMessage(); - Object getCurrentExpectedMessage(); - List getMessageProcessingTimesSinceLastExpectedMessage(); + public List getMessagesSinceLastExpectedMessage() { + return ImmutableList.copyOf(messagesSinceLastExpectedMessage); } - private class FailedExpectation implements Error { - + private static final class FailedExpectation implements Error { private final Object lastExpectedMessage; private final Object currentExpectedMessage; private final List messagesSinceLastExpectedMessage; private final long expectedTimeInMillis; private final long actualTimeInMillis; - public FailedExpectation(Object lastExpectedMessage, Object message, List messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) { + FailedExpectation(final Object lastExpectedMessage, final Object message, + final List messagesSinceLastExpectedMessage, final long expectedTimeNanos, + final long actualTimeNanos) { this.lastExpectedMessage = lastExpectedMessage; this.currentExpectedMessage = message; - this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage; - this.expectedTimeInMillis = expectedTimeInMillis; - this.actualTimeInMillis = actualTimeInMillis; + this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage); + this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos); + this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos); } + @Override public Object getLastExpectedMessage() { return lastExpectedMessage; } + @Override public Object getCurrentExpectedMessage() { return currentExpectedMessage; } + @Override public List getMessageProcessingTimesSinceLastExpectedMessage() { return messagesSinceLastExpectedMessage; } @@ -182,81 +220,78 @@ public final class MessageTracker { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("\n> Last Expected Message = " + lastExpectedMessage); - builder.append("\n> Current Expected Message = " + currentExpectedMessage); - builder.append("\n> Expected time in between messages = " + expectedTimeInMillis); - builder.append("\n> Actual time in between messages = " + actualTimeInMillis); + builder.append("\n> Last Expected Message = ").append(lastExpectedMessage); + builder.append("\n> Current Expected Message = ").append(currentExpectedMessage); + builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis); + builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis); for (MessageProcessingTime time : messagesSinceLastExpectedMessage) { - builder.append("\n\t> ").append(time.toString()); + builder.append("\n\t> ").append(time); } return builder.toString(); } - } - public interface Context { - Context done(); - Optional error(); - } - - private static class NoOpContext implements Context { + private abstract class AbstractTimedContext extends Context { + abstract Object message(); + abstract Stopwatch stopTimer(); @Override - public Context done() { + public final Context done() { + processed(message(), stopTimer().elapsed(NANOSECONDS)); return this; } - - @Override - public Optional error() { - return Optional.absent(); - } } - private class CurrentMessageContext implements Context { - Stopwatch stopwatch = Stopwatch.createStarted(); - boolean done = true; + private final class CurrentMessageContext extends AbstractTimedContext { + private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker); + private Object message; - public void reset(){ - Preconditions.checkState(done, - String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage)); - done = false; - stopwatch.reset().start(); + void reset(final Object message) { + this.message = Preconditions.checkNotNull(message); + Preconditions.checkState(!stopwatch.isRunning(), + "Trying to reset a context that is not done (%s). currentMessage = %s", this, message); + stopwatch.start(); } @Override - public Context done() { - processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS)); - done = true; - return this; + Object message() { + return message; + } + + @Override + Stopwatch stopTimer() { + return stopwatch.stop(); } @Override - public Optional error() { + public Optional error() { return Optional.absent(); } } - private class ErrorContext implements Context { - Object message; - private final Optional error; - Stopwatch stopwatch; + private final class ErrorContext extends AbstractTimedContext { + private final Stopwatch stopwatch = Stopwatch.createStarted(ticker); + private final Object message; + private final Error error; - ErrorContext(Object message, Optional error){ - this.message = message; - this.error = error; - this.stopwatch = Stopwatch.createStarted(); + ErrorContext(final Object message, final Error error) { + this.message = Preconditions.checkNotNull(message); + this.error = Preconditions.checkNotNull(error); } @Override - public Context done(){ - processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS)); - this.stopwatch.stop(); - return this; + Object message() { + return message; + } + + @Override + Stopwatch stopTimer() { + return stopwatch.stop(); } @Override - public Optional error() { - return error; + public Optional error() { + return Optional.of(error); } } }