From: Robert Varga Date: Mon, 21 Mar 2016 19:34:12 +0000 (+0100) Subject: Refactor MessageTracker X-Git-Tag: release/boron~279 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=2e75f2a8df84ff50d2b608c3346978d81378ee98 Refactor MessageTracker This patch updates MessageTracker implementation with the following: - lower overhead when dealing with StopWatch under normal conditions - prevent Context from being instantiated from the outside - use a Ticker instance for reliable testing - use Stopwatch.isRunning() instead of explicit done/enable boolean - properly reset interval timer when expected message is received - add explicit @NotThreadSafe and @Beta annotations - move currentMessage to CurrentMessageContext Change-Id: Idd553a39a8fb885a5c05e391cb4ae45384a59f07 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java index f046240734..08516252fd 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java @@ -8,13 +8,20 @@ package org.opendaylight.controller.cluster.common.actor; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.NotThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was @@ -47,25 +54,88 @@ import java.util.concurrent.TimeUnit; * * */ +@Beta +@NotThreadSafe public final class MessageTracker { + public static abstract class Context { + Context() { + // Hidden to prevent outside instantiation + } - private static final Context NO_OP_CONTEXT = new NoOpContext(); + public abstract Context done(); + public abstract Optional error(); + } - private final Class expectedMessageClass; + public interface Error { + Object getLastExpectedMessage(); + Object getCurrentExpectedMessage(); + List getMessageProcessingTimesSinceLastExpectedMessage(); + } - private final long expectedArrivalInterval; + + public static final class MessageProcessingTime { + private final Class messageClass; + private final long elapsedTimeInNanos; + + MessageProcessingTime(final Class messageClass, final long elapsedTimeInNanos) { + this.messageClass = Preconditions.checkNotNull(messageClass); + this.elapsedTimeInNanos = elapsedTimeInNanos; + } + + @Override + public String toString() { + return "MessageProcessingTime{" + + "messageClass=" + messageClass.getSimpleName() + + ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) + + '}'; + } + + public Class getMessageClass() { + return messageClass; + } + + public long getElapsedTimeInNanos() { + return elapsedTimeInNanos; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class); + private static final Context NO_OP_CONTEXT = new Context() { + @Override + public Context done() { + return this; + } + + @Override + public Optional error() { + return Optional.absent(); + } + }; private final List messagesSinceLastExpectedMessage = new LinkedList<>(); - private Stopwatch expectedMessageWatch; + private final CurrentMessageContext currentMessageContext; - private boolean enabled = false; + private final Stopwatch expectedMessageWatch; - private Object lastExpectedMessage; + private final Class expectedMessageClass; + + private final long expectedArrivalInterval; - private Object currentMessage; + private final Ticker ticker; - private final CurrentMessageContext currentMessageContext = new CurrentMessageContext(); + private Object lastExpectedMessage; + + @VisibleForTesting + MessageTracker(final Class expectedMessageClass, final long expectedArrivalIntervalInMillis, + final Ticker ticker) { + Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0); + this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass); + this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis); + this.ticker = Preconditions.checkNotNull(ticker); + this.expectedMessageWatch = Stopwatch.createUnstarted(ticker); + this.currentMessageContext = new CurrentMessageContext(); + } /** * @@ -73,108 +143,76 @@ public final class MessageTracker { * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected * message */ - public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){ - this.expectedMessageClass = expectedMessageClass; - this.expectedArrivalInterval = expectedArrivalIntervalInMillis; + public MessageTracker(final Class expectedMessageClass, final long expectedArrivalIntervalInMillis) { + this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker()); } - public void begin(){ - if(enabled) { - return; + public void begin() { + if (!expectedMessageWatch.isRunning()) { + LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval); + expectedMessageWatch.start(); } - enabled = true; - expectedMessageWatch = Stopwatch.createStarted(); } - public Context received(Object message){ - if(!enabled) { + public Context received(final Object message) { + if (!expectedMessageWatch.isRunning()) { return NO_OP_CONTEXT; } - this.currentMessage = message; - if(expectedMessageClass.isInstance(message)){ - long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS); - if(actualElapsedTime > expectedArrivalInterval){ - return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message, - ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval, - actualElapsedTime))); + + if (expectedMessageClass.isInstance(message)) { + final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS); + if (actualElapsedTime > expectedArrivalInterval) { + return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message, + messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime)); } - this.lastExpectedMessage = message; - this.messagesSinceLastExpectedMessage.clear(); + lastExpectedMessage = message; + messagesSinceLastExpectedMessage.clear(); + expectedMessageWatch.reset().start(); } - currentMessageContext.reset(); + currentMessageContext.reset(message); return currentMessageContext; } - private void processed(Object message, long messageElapseTimeInNanos){ - if(!enabled) { - return; - } - if(!expectedMessageClass.isInstance(message)){ - this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos)); - } - } - - public List getMessagesSinceLastExpectedMessage(){ - return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage); - } - - public static class MessageProcessingTime { - private final Class messageClass; - private final long elapsedTimeInNanos; - - MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){ - this.messageClass = messageClass; - this.elapsedTimeInNanos = elapsedTimeInNanos; - } - - @Override - public String toString() { - return "MessageProcessingTime{" + - "messageClass=" + messageClass.getSimpleName() + - ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) + - '}'; - } - - public Class getMessageClass() { - return messageClass; - } - - public long getElapsedTimeInNanos() { - return elapsedTimeInNanos; + void processed(final Object message, final long messageElapseTimeInNanos) { + if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) { + messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), + messageElapseTimeInNanos)); } } - public interface Error { - Object getLastExpectedMessage(); - Object getCurrentExpectedMessage(); - List getMessageProcessingTimesSinceLastExpectedMessage(); + public List getMessagesSinceLastExpectedMessage() { + return ImmutableList.copyOf(messagesSinceLastExpectedMessage); } - private class FailedExpectation implements Error { - + private static final class FailedExpectation implements Error { private final Object lastExpectedMessage; private final Object currentExpectedMessage; private final List messagesSinceLastExpectedMessage; private final long expectedTimeInMillis; private final long actualTimeInMillis; - public FailedExpectation(Object lastExpectedMessage, Object message, List messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) { + FailedExpectation(final Object lastExpectedMessage, final Object message, + final List messagesSinceLastExpectedMessage, final long expectedTimeNanos, + final long actualTimeNanos) { this.lastExpectedMessage = lastExpectedMessage; this.currentExpectedMessage = message; - this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage; - this.expectedTimeInMillis = expectedTimeInMillis; - this.actualTimeInMillis = actualTimeInMillis; + this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage); + this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos); + this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos); } + @Override public Object getLastExpectedMessage() { return lastExpectedMessage; } + @Override public Object getCurrentExpectedMessage() { return currentExpectedMessage; } + @Override public List getMessageProcessingTimesSinceLastExpectedMessage() { return messagesSinceLastExpectedMessage; } @@ -182,81 +220,78 @@ public final class MessageTracker { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("\n> Last Expected Message = " + lastExpectedMessage); - builder.append("\n> Current Expected Message = " + currentExpectedMessage); - builder.append("\n> Expected time in between messages = " + expectedTimeInMillis); - builder.append("\n> Actual time in between messages = " + actualTimeInMillis); + builder.append("\n> Last Expected Message = ").append(lastExpectedMessage); + builder.append("\n> Current Expected Message = ").append(currentExpectedMessage); + builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis); + builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis); for (MessageProcessingTime time : messagesSinceLastExpectedMessage) { - builder.append("\n\t> ").append(time.toString()); + builder.append("\n\t> ").append(time); } return builder.toString(); } - } - public interface Context { - Context done(); - Optional error(); - } - - private static class NoOpContext implements Context { + private abstract class AbstractTimedContext extends Context { + abstract Object message(); + abstract Stopwatch stopTimer(); @Override - public Context done() { + public final Context done() { + processed(message(), stopTimer().elapsed(NANOSECONDS)); return this; } - - @Override - public Optional error() { - return Optional.absent(); - } } - private class CurrentMessageContext implements Context { - Stopwatch stopwatch = Stopwatch.createStarted(); - boolean done = true; + private final class CurrentMessageContext extends AbstractTimedContext { + private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker); + private Object message; - public void reset(){ - Preconditions.checkState(done, - String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage)); - done = false; - stopwatch.reset().start(); + void reset(final Object message) { + this.message = Preconditions.checkNotNull(message); + Preconditions.checkState(!stopwatch.isRunning(), + "Trying to reset a context that is not done (%s). currentMessage = %s", this, message); + stopwatch.start(); } @Override - public Context done() { - processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS)); - done = true; - return this; + Object message() { + return message; + } + + @Override + Stopwatch stopTimer() { + return stopwatch.stop(); } @Override - public Optional error() { + public Optional error() { return Optional.absent(); } } - private class ErrorContext implements Context { - Object message; - private final Optional error; - Stopwatch stopwatch; + private final class ErrorContext extends AbstractTimedContext { + private final Stopwatch stopwatch = Stopwatch.createStarted(ticker); + private final Object message; + private final Error error; - ErrorContext(Object message, Optional error){ - this.message = message; - this.error = error; - this.stopwatch = Stopwatch.createStarted(); + ErrorContext(final Object message, final Error error) { + this.message = Preconditions.checkNotNull(message); + this.error = Preconditions.checkNotNull(error); } @Override - public Context done(){ - processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS)); - this.stopwatch.stop(); - return this; + Object message() { + return message; + } + + @Override + Stopwatch stopTimer() { + return stopwatch.stop(); } @Override - public Optional error() { - return error; + public Optional error() { + return Optional.of(error); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index ab878055ae..474c794b46 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MessageTracker; +import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; @@ -210,11 +211,11 @@ public class Shard extends RaftActor { @Override protected void handleCommand(final Object message) { - MessageTracker.Context context = appendEntriesReplyTracker.received(message); - - if(context.error().isPresent()){ + final MessageTracker.Context context = appendEntriesReplyTracker.received(message); + final Optional maybeError = context.error(); + if (maybeError.isPresent()) { LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(), - context.error()); + maybeError.get()); } try { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/common/actor/MessageTrackerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/common/actor/MessageTrackerTest.java index 12e79d2e00..8a9d789fde 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/common/actor/MessageTrackerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/common/actor/MessageTrackerTest.java @@ -8,47 +8,65 @@ package org.opendaylight.controller.cluster.common.actor; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import com.google.common.util.concurrent.Uninterruptibles; +import com.google.common.base.Ticker; import java.util.List; -import java.util.concurrent.TimeUnit; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MessageTrackerTest { + private static final class TestTicker extends Ticker { + private long ticks; + + @Override + public long read() { + return ticks; + } + + void increment(final long ticks) { + this.ticks += ticks; + } + } + + private static final class Foo { + // Intentionally empty + } + private final static Logger LOG = LoggerFactory.getLogger(MessageTrackerTest.class); - private class Foo { + private TestTicker ticker; + private MessageTracker messageTracker; + @Before + public void setup() { + ticker = new TestTicker(); + messageTracker = new MessageTracker(Foo.class, 10, ticker); } @Test public void testNoTracking() { - MessageTracker messageTracker = new MessageTracker(Foo.class, 10); - MessageTracker.Context context1 = messageTracker.received(new Foo()); context1.done(); - Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); - + ticker.increment(MILLISECONDS.toNanos(20)); MessageTracker.Context context2 = messageTracker.received(new Foo()); context2.done(); - } @Test public void testFailedExpectationOnTracking() { - MessageTracker messageTracker = new MessageTracker(Foo.class, 10); messageTracker.begin(); MessageTracker.Context context1 = messageTracker.received(new Foo()); context1.done(); - Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + ticker.increment(MILLISECONDS.toNanos(20)); MessageTracker.Context context2 = messageTracker.received(new Foo()); Assert.assertEquals(true, context2.error().isPresent()); @@ -58,17 +76,16 @@ public class MessageTrackerTest { @Test public void testFailedExpectationOnTrackingWithMessagesInBetween() { - MessageTracker messageTracker = new MessageTracker(Foo.class, 10); messageTracker.begin(); MessageTracker.Context context1 = messageTracker.received(new Foo()); context1.done(); messageTracker.received("A").done(); - messageTracker.received(Long.valueOf(10)).done(); - MessageTracker.Context c = messageTracker.received(Integer.valueOf(100)); + messageTracker.received(10L).done(); + MessageTracker.Context c = messageTracker.received(100); - Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + ticker.increment(MILLISECONDS.toNanos(20)); c.done(); @@ -86,7 +103,7 @@ public class MessageTrackerTest { Assert.assertEquals(String.class, messageProcessingTimes.get(0).getMessageClass()); Assert.assertEquals(Long.class, messageProcessingTimes.get(1).getMessageClass()); Assert.assertEquals(Integer.class, messageProcessingTimes.get(2).getMessageClass()); - Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > TimeUnit.MILLISECONDS.toNanos(10)); + Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > MILLISECONDS.toNanos(10)); Assert.assertEquals(Foo.class, error.getLastExpectedMessage().getClass()); Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass()); @@ -96,13 +113,12 @@ public class MessageTrackerTest { @Test public void testMetExpectationOnTracking() { - MessageTracker messageTracker = new MessageTracker(Foo.class, 10); messageTracker.begin(); MessageTracker.Context context1 = messageTracker.received(new Foo()); context1.done(); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); + ticker.increment(MILLISECONDS.toNanos(1)); MessageTracker.Context context2 = messageTracker.received(new Foo()); Assert.assertEquals(false, context2.error().isPresent()); @@ -111,7 +127,6 @@ public class MessageTrackerTest { @Test public void testIllegalStateExceptionWhenDoneIsNotCalledWhileTracking() { - MessageTracker messageTracker = new MessageTracker(Foo.class, 10); messageTracker.begin(); messageTracker.received(new Foo()); @@ -126,19 +141,15 @@ public class MessageTrackerTest { @Test public void testNoIllegalStateExceptionWhenDoneIsNotCalledWhileNotTracking() { - MessageTracker messageTracker = new MessageTracker(Foo.class, 10); - messageTracker.received(new Foo()); messageTracker.received(new Foo()); } @Test public void testDelayInFirstExpectedMessageArrival(){ - - MessageTracker messageTracker = new MessageTracker(Foo.class, 10); messageTracker.begin(); - Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + ticker.increment(MILLISECONDS.toNanos(20)); MessageTracker.Context context = messageTracker.received(new Foo()); @@ -157,10 +168,9 @@ public class MessageTrackerTest { @Test public void testCallingBeginDoesNotResetWatch() { - MessageTracker messageTracker = new MessageTracker(Foo.class, 10); messageTracker.begin(); - Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + ticker.increment(MILLISECONDS.toNanos(20)); messageTracker.begin(); @@ -173,7 +183,6 @@ public class MessageTrackerTest { @Test public void testMessagesSinceLastExpectedMessage() { - MessageTracker messageTracker = new MessageTracker(Foo.class, 10); messageTracker.begin(); MessageTracker.Context context1 = messageTracker.received(Integer.valueOf(45)).done();