X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fcommon%2Factor%2FMessageTracker.java;h=1ba5ac6bdeb485a53fee0948df88b824a4b32ba6;hb=e373f634d72fa6ef692d4fd0d016c6deeba8ffe8;hp=f046240734762e8611a544526e740bd461378924;hpb=93c17c55ca0c3690d77c96cdfed91b6f186fe56a;p=controller.git diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java index f046240734..1ba5ac6bde 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java @@ -5,23 +5,31 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.common.actor; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that * received between the arrival of two instances of the same message and the amount of time it took to process each * of those messages. - *
+ *
* Usage of the API is as follows, *
  *
@@ -34,229 +42,262 @@ import java.util.concurrent.TimeUnit;
  *
  *     .....
  *
- *     MessageTracker.Context context = tracker.received(message);
- *
- *     if(context.error().isPresent()){
- *         LOG.error("{}", context.error().get());
- *     }
+ *     try (MessageTracker.Context context = tracker.received(message)) {
  *
- *     // Some custom processing
- *     process(message);
+ *         if (context.error().isPresent()){
+ *             LOG.error("{}", context.error().get());
+ *         }
  *
- *     context.done();
+ *         // Some custom processing
+ *         process(message);
+ *     }
  *
  * 
+ * + *

+ * This class is NOT thread-safe. */ +@Beta public final class MessageTracker { + public abstract static class Context implements AutoCloseable { + Context() { + // Hidden to prevent outside instantiation + } - private static final Context NO_OP_CONTEXT = new NoOpContext(); + public abstract Optional error(); - private final Class expectedMessageClass; + @Override + public abstract void close(); + } - private final long expectedArrivalInterval; + public interface Error { + Object getLastExpectedMessage(); - private final List messagesSinceLastExpectedMessage = new LinkedList<>(); + Object getCurrentExpectedMessage(); - private Stopwatch expectedMessageWatch; + List getMessageProcessingTimesSinceLastExpectedMessage(); + } - private boolean enabled = false; - private Object lastExpectedMessage; + public static final class MessageProcessingTime { + private final Class messageClass; + private final long elapsedTimeInNanos; - private Object currentMessage; + MessageProcessingTime(final Class messageClass, final long elapsedTimeInNanos) { + this.messageClass = requireNonNull(messageClass); + this.elapsedTimeInNanos = elapsedTimeInNanos; + } - private final CurrentMessageContext currentMessageContext = new CurrentMessageContext(); + @Override + public String toString() { + return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis=" + + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]"; + } - /** - * - * @param expectedMessageClass The class of the message to track - * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected - * message - */ - public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){ - this.expectedMessageClass = expectedMessageClass; - this.expectedArrivalInterval = expectedArrivalIntervalInMillis; - } - public void begin(){ - if(enabled) { - return; + public Class getMessageClass() { + return messageClass; } - enabled = true; - expectedMessageWatch = Stopwatch.createStarted(); - } - public Context received(Object message){ - if(!enabled) { - return NO_OP_CONTEXT; - } - this.currentMessage = message; - if(expectedMessageClass.isInstance(message)){ - long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS); - if(actualElapsedTime > expectedArrivalInterval){ - return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message, - ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval, - actualElapsedTime))); - } - this.lastExpectedMessage = message; - this.messagesSinceLastExpectedMessage.clear(); + public long getElapsedTimeInNanos() { + return elapsedTimeInNanos; } - - currentMessageContext.reset(); - return currentMessageContext; } - private void processed(Object message, long messageElapseTimeInNanos){ - if(!enabled) { - return; + private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class); + private static final Context NO_OP_CONTEXT = new Context() { + @Override + public void close() { + // No-op } - if(!expectedMessageClass.isInstance(message)){ - this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos)); + + @Override + public Optional error() { + return Optional.empty(); } - } + }; + + private final List messagesSinceLastExpectedMessage = new LinkedList<>(); + + private final CurrentMessageContext currentMessageContext; + + private final Stopwatch expectedMessageWatch; - public List getMessagesSinceLastExpectedMessage(){ - return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage); + private final Class expectedMessageClass; + + private final long expectedArrivalInterval; + + private final Ticker ticker; + + private Object lastExpectedMessage; + + @VisibleForTesting + MessageTracker(final Class expectedMessageClass, final long expectedArrivalIntervalInMillis, + final Ticker ticker) { + checkArgument(expectedArrivalIntervalInMillis >= 0); + this.expectedMessageClass = requireNonNull(expectedMessageClass); + this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis); + this.ticker = requireNonNull(ticker); + this.expectedMessageWatch = Stopwatch.createUnstarted(ticker); + this.currentMessageContext = new CurrentMessageContext(); } - public static class MessageProcessingTime { - private final Class messageClass; - private final long elapsedTimeInNanos; + /** + * Constructs an instance. + * + * @param expectedMessageClass the class of the message to track + * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected + * message + */ + public MessageTracker(final Class expectedMessageClass, final long expectedArrivalIntervalInMillis) { + this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker()); + } - MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){ - this.messageClass = messageClass; - this.elapsedTimeInNanos = elapsedTimeInNanos; + public void begin() { + if (!expectedMessageWatch.isRunning()) { + LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval); + expectedMessageWatch.start(); } + } - @Override - public String toString() { - return "MessageProcessingTime{" + - "messageClass=" + messageClass.getSimpleName() + - ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) + - '}'; + public Context received(final Object message) { + if (!expectedMessageWatch.isRunning()) { + return NO_OP_CONTEXT; } - public Class getMessageClass() { - return messageClass; + if (expectedMessageClass.isInstance(message)) { + final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS); + if (actualElapsedTime > expectedArrivalInterval) { + return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message, + messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime)); + } + lastExpectedMessage = message; + messagesSinceLastExpectedMessage.clear(); + expectedMessageWatch.reset().start(); } - public long getElapsedTimeInNanos() { - return elapsedTimeInNanos; - } + currentMessageContext.reset(message); + return currentMessageContext; } - public interface Error { - Object getLastExpectedMessage(); - Object getCurrentExpectedMessage(); - List getMessageProcessingTimesSinceLastExpectedMessage(); + void processed(final Object message, final long messageElapseTimeInNanos) { + if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) { + messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), + messageElapseTimeInNanos)); + } } - private class FailedExpectation implements Error { + public List getMessagesSinceLastExpectedMessage() { + return ImmutableList.copyOf(messagesSinceLastExpectedMessage); + } + private static final class FailedExpectation implements Error { private final Object lastExpectedMessage; private final Object currentExpectedMessage; private final List messagesSinceLastExpectedMessage; private final long expectedTimeInMillis; private final long actualTimeInMillis; - public FailedExpectation(Object lastExpectedMessage, Object message, List messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) { + FailedExpectation(final Object lastExpectedMessage, final Object message, + final List messagesSinceLastExpectedMessage, final long expectedTimeNanos, + final long actualTimeNanos) { this.lastExpectedMessage = lastExpectedMessage; this.currentExpectedMessage = message; - this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage; - this.expectedTimeInMillis = expectedTimeInMillis; - this.actualTimeInMillis = actualTimeInMillis; + this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage); + this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos); + this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos); } + @Override public Object getLastExpectedMessage() { return lastExpectedMessage; } + @Override public Object getCurrentExpectedMessage() { return currentExpectedMessage; } + @Override public List getMessageProcessingTimesSinceLastExpectedMessage() { return messagesSinceLastExpectedMessage; } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("\n> Last Expected Message = " + lastExpectedMessage); - builder.append("\n> Current Expected Message = " + currentExpectedMessage); - builder.append("\n> Expected time in between messages = " + expectedTimeInMillis); - builder.append("\n> Actual time in between messages = " + actualTimeInMillis); + StringBuilder builder = new StringBuilder() + .append("\n> Last Expected Message = ").append(lastExpectedMessage) + .append("\n> Current Expected Message = ").append(currentExpectedMessage) + .append("\n> Expected time in between messages = ").append(expectedTimeInMillis) + .append("\n> Actual time in between messages = ").append(actualTimeInMillis); for (MessageProcessingTime time : messagesSinceLastExpectedMessage) { - builder.append("\n\t> ").append(time.toString()); + builder.append("\n\t> ").append(time); } return builder.toString(); } - - } - - public interface Context { - Context done(); - Optional error(); } - private static class NoOpContext implements Context { + private abstract class AbstractTimedContext extends Context { + abstract Object message(); - @Override - public Context done() { - return this; - } + abstract Stopwatch stopTimer(); @Override - public Optional error() { - return Optional.absent(); + public final void close() { + processed(message(), stopTimer().elapsed(NANOSECONDS)); } } - private class CurrentMessageContext implements Context { - Stopwatch stopwatch = Stopwatch.createStarted(); - boolean done = true; + private final class CurrentMessageContext extends AbstractTimedContext { + private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker); + private Object message; + + void reset(final Object newMessage) { + this.message = requireNonNull(newMessage); + checkState(!stopwatch.isRunning(), "Trying to reset a context that is not done (%s). currentMessage = %s", + this, newMessage); + stopwatch.start(); + } - public void reset(){ - Preconditions.checkState(done, - String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage)); - done = false; - stopwatch.reset().start(); + @Override + Object message() { + return message; } @Override - public Context done() { - processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS)); - done = true; - return this; + Stopwatch stopTimer() { + return stopwatch.stop(); } @Override - public Optional error() { - return Optional.absent(); + public Optional error() { + return Optional.empty(); } } - private class ErrorContext implements Context { - Object message; - private final Optional error; - Stopwatch stopwatch; + private final class ErrorContext extends AbstractTimedContext { + private final Stopwatch stopwatch = Stopwatch.createStarted(ticker); + private final Object message; + private final Error error; + + ErrorContext(final Object message, final Error error) { + this.message = requireNonNull(message); + this.error = requireNonNull(error); + } - ErrorContext(Object message, Optional error){ - this.message = message; - this.error = error; - this.stopwatch = Stopwatch.createStarted(); + @Override + Object message() { + return message; } @Override - public Context done(){ - processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS)); - this.stopwatch.stop(); - return this; + Stopwatch stopTimer() { + return stopwatch.stop(); } @Override - public Optional error() { - return error; + public Optional error() { + return Optional.of(error); } } }