Refactor MessageTracker 11/36511/5
authorRobert Varga <rovarga@cisco.com>
Mon, 21 Mar 2016 19:34:12 +0000 (20:34 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 29 Mar 2016 22:07:18 +0000 (22:07 +0000)
This patch updates MessageTracker implementation with the following:
- lower overhead when dealing with StopWatch under normal conditions
- prevent Context from being instantiated from the outside
- use a Ticker instance for reliable testing
- use Stopwatch.isRunning() instead of explicit done/enable boolean
- properly reset interval timer when expected message is received
- add explicit @NotThreadSafe and @Beta annotations
- move currentMessage to CurrentMessageContext

Change-Id: Idd553a39a8fb885a5c05e391cb4ae45384a59f07
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MessageTracker.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/common/actor/MessageTrackerTest.java

index f046240..0851625 100644 (file)
@@ -8,13 +8,20 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
 import com.google.common.collect.ImmutableList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
@@ -47,25 +54,88 @@ import java.util.concurrent.TimeUnit;
  *
  * </pre>
  */
+@Beta
+@NotThreadSafe
 public final class MessageTracker {
+    public static abstract class Context {
+        Context() {
+            // Hidden to prevent outside instantiation
+        }
 
-    private static final Context NO_OP_CONTEXT = new NoOpContext();
+        public abstract Context done();
+        public abstract Optional<Error> error();
+    }
 
-    private final Class<?> expectedMessageClass;
+    public interface Error {
+        Object getLastExpectedMessage();
+        Object getCurrentExpectedMessage();
+        List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+    }
 
-    private final long expectedArrivalInterval;
+
+    public static final class MessageProcessingTime {
+        private final Class<?> messageClass;
+        private final long elapsedTimeInNanos;
+
+        MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
+            this.messageClass = Preconditions.checkNotNull(messageClass);
+            this.elapsedTimeInNanos = elapsedTimeInNanos;
+        }
+
+        @Override
+        public String toString() {
+            return "MessageProcessingTime{" +
+                    "messageClass=" + messageClass.getSimpleName() +
+                    ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) +
+                    '}';
+        }
+
+        public Class<?> getMessageClass() {
+            return messageClass;
+        }
+
+        public long getElapsedTimeInNanos() {
+            return elapsedTimeInNanos;
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
+    private static final Context NO_OP_CONTEXT = new Context() {
+        @Override
+        public Context done() {
+            return this;
+        }
+
+        @Override
+        public Optional<Error> error() {
+            return Optional.absent();
+        }
+    };
 
     private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
 
-    private Stopwatch expectedMessageWatch;
+    private final CurrentMessageContext currentMessageContext;
 
-    private boolean enabled = false;
+    private final Stopwatch expectedMessageWatch;
 
-    private Object lastExpectedMessage;
+    private final Class<?> expectedMessageClass;
+
+    private final long expectedArrivalInterval;
 
-    private Object currentMessage;
+    private final Ticker ticker;
 
-    private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
+    private Object lastExpectedMessage;
+
+    @VisibleForTesting
+    MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
+            final Ticker ticker) {
+        Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
+        this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
+        this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
+        this.ticker = Preconditions.checkNotNull(ticker);
+        this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
+        this.currentMessageContext = new CurrentMessageContext();
+    }
 
     /**
      *
@@ -73,108 +143,76 @@ public final class MessageTracker {
      * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
      *                                        message
      */
-    public MessageTracker(Class<?> expectedMessageClass, long expectedArrivalIntervalInMillis){
-        this.expectedMessageClass = expectedMessageClass;
-        this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
+    public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
+        this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
     }
 
-    public void begin(){
-        if(enabled) {
-            return;
+    public void begin() {
+        if (!expectedMessageWatch.isRunning()) {
+            LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
+            expectedMessageWatch.start();
         }
-        enabled = true;
-        expectedMessageWatch = Stopwatch.createStarted();
     }
 
-    public Context received(Object message){
-        if(!enabled) {
+    public Context received(final Object message) {
+        if (!expectedMessageWatch.isRunning()) {
             return NO_OP_CONTEXT;
         }
-        this.currentMessage = message;
-        if(expectedMessageClass.isInstance(message)){
-            long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
-            if(actualElapsedTime > expectedArrivalInterval){
-                return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
-                        ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
-                        actualElapsedTime)));
+
+        if (expectedMessageClass.isInstance(message)) {
+            final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
+            if (actualElapsedTime > expectedArrivalInterval) {
+                return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
+                        messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
             }
-            this.lastExpectedMessage = message;
-            this.messagesSinceLastExpectedMessage.clear();
+            lastExpectedMessage = message;
+            messagesSinceLastExpectedMessage.clear();
+            expectedMessageWatch.reset().start();
         }
 
-        currentMessageContext.reset();
+        currentMessageContext.reset(message);
         return currentMessageContext;
     }
 
-    private void processed(Object message, long messageElapseTimeInNanos){
-        if(!enabled) {
-            return;
-        }
-        if(!expectedMessageClass.isInstance(message)){
-            this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
-        }
-    }
-
-    public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
-        return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
-    }
-
-    public static class MessageProcessingTime {
-        private final Class<?> messageClass;
-        private final long elapsedTimeInNanos;
-
-        MessageProcessingTime(Class<?> messageClass, long elapsedTimeInNanos){
-            this.messageClass = messageClass;
-            this.elapsedTimeInNanos = elapsedTimeInNanos;
-        }
-
-        @Override
-        public String toString() {
-            return "MessageProcessingTime{" +
-                    "messageClass=" + messageClass.getSimpleName() +
-                    ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
-                    '}';
-        }
-
-        public Class<?> getMessageClass() {
-            return messageClass;
-        }
-
-        public long getElapsedTimeInNanos() {
-            return elapsedTimeInNanos;
+    void processed(final Object message, final long messageElapseTimeInNanos) {
+        if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
+            messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
+                messageElapseTimeInNanos));
         }
     }
 
-    public interface Error {
-        Object getLastExpectedMessage();
-        Object getCurrentExpectedMessage();
-        List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+    public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
+        return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
     }
 
-    private class FailedExpectation implements Error {
-
+    private static final class FailedExpectation implements Error {
         private final Object lastExpectedMessage;
         private final Object currentExpectedMessage;
         private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
         private final long expectedTimeInMillis;
         private final long actualTimeInMillis;
 
-        public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
+        FailedExpectation(final Object lastExpectedMessage, final Object message,
+                final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
+                final long actualTimeNanos) {
             this.lastExpectedMessage = lastExpectedMessage;
             this.currentExpectedMessage = message;
-            this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
-            this.expectedTimeInMillis = expectedTimeInMillis;
-            this.actualTimeInMillis = actualTimeInMillis;
+            this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
+            this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
+            this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
         }
 
+        @Override
         public Object getLastExpectedMessage() {
             return lastExpectedMessage;
         }
 
+        @Override
         public Object getCurrentExpectedMessage() {
             return currentExpectedMessage;
         }
 
+        @Override
         public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
             return messagesSinceLastExpectedMessage;
         }
@@ -182,81 +220,78 @@ public final class MessageTracker {
         @Override
         public String toString() {
             StringBuilder builder = new StringBuilder();
-            builder.append("\n> Last Expected Message = " + lastExpectedMessage);
-            builder.append("\n> Current Expected Message = " + currentExpectedMessage);
-            builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
-            builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
+            builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
+            builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
+            builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
+            builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
             for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
-                builder.append("\n\t> ").append(time.toString());
+                builder.append("\n\t> ").append(time);
             }
             return builder.toString();
         }
-
     }
 
-    public interface Context {
-        Context done();
-        Optional<? extends Error> error();
-    }
-
-    private static class NoOpContext implements Context {
+    private abstract class AbstractTimedContext extends Context {
+        abstract Object message();
+        abstract Stopwatch stopTimer();
 
         @Override
-        public Context done() {
+        public final Context done() {
+            processed(message(), stopTimer().elapsed(NANOSECONDS));
             return this;
         }
-
-        @Override
-        public Optional<Error> error() {
-            return Optional.absent();
-        }
     }
 
-    private class CurrentMessageContext implements Context {
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        boolean done = true;
+    private final class CurrentMessageContext extends AbstractTimedContext {
+        private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
+        private Object message;
 
-        public void reset(){
-            Preconditions.checkState(done,
-                    String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage));
-            done = false;
-            stopwatch.reset().start();
+        void reset(final Object message) {
+            this.message = Preconditions.checkNotNull(message);
+            Preconditions.checkState(!stopwatch.isRunning(),
+                "Trying to reset a context that is not done (%s). currentMessage = %s", this, message);
+            stopwatch.start();
         }
 
         @Override
-        public Context done() {
-            processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
-            done = true;
-            return this;
+        Object message() {
+            return message;
+        }
+
+        @Override
+        Stopwatch stopTimer() {
+            return stopwatch.stop();
         }
 
         @Override
-        public Optional<? extends Error> error() {
+        public Optional<Error> error() {
             return Optional.absent();
         }
     }
 
-    private class ErrorContext implements Context {
-        Object message;
-        private final Optional<? extends Error> error;
-        Stopwatch stopwatch;
+    private final class ErrorContext extends AbstractTimedContext {
+        private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
+        private final Object message;
+        private final Error error;
 
-        ErrorContext(Object message, Optional<? extends Error> error){
-            this.message = message;
-            this.error = error;
-            this.stopwatch = Stopwatch.createStarted();
+        ErrorContext(final Object message, final Error error) {
+            this.message = Preconditions.checkNotNull(message);
+            this.error = Preconditions.checkNotNull(error);
         }
 
         @Override
-        public Context done(){
-            processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
-            this.stopwatch.stop();
-            return this;
+        Object message() {
+            return message;
+        }
+
+        @Override
+        Stopwatch stopTimer() {
+            return stopwatch.stop();
         }
 
         @Override
-        public Optional<? extends Error> error() {
-            return error;
+        public Optional<Error> error() {
+            return Optional.of(error);
         }
     }
 }
index ab87805..474c794 100644 (file)
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
+import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
@@ -210,11 +211,11 @@ public class Shard extends RaftActor {
     @Override
     protected void handleCommand(final Object message) {
 
-        MessageTracker.Context context = appendEntriesReplyTracker.received(message);
-
-        if(context.error().isPresent()){
+        final MessageTracker.Context context = appendEntriesReplyTracker.received(message);
+        final Optional<Error> maybeError = context.error();
+        if (maybeError.isPresent()) {
             LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
-                context.error());
+                maybeError.get());
         }
 
         try {
index 12e79d2..8a9d789 100644 (file)
@@ -8,47 +8,65 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.common.base.Ticker;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MessageTrackerTest {
 
+    private static final class TestTicker extends Ticker {
+        private long ticks;
+
+        @Override
+        public long read() {
+            return ticks;
+        }
+
+        void increment(final long ticks) {
+            this.ticks += ticks;
+        }
+    }
+
+    private static final class Foo {
+        // Intentionally empty
+    }
+
     private final static Logger LOG = LoggerFactory.getLogger(MessageTrackerTest.class);
 
-    private class Foo {
+    private TestTicker ticker;
+    private MessageTracker messageTracker;
 
+    @Before
+    public void setup() {
+        ticker = new TestTicker();
+        messageTracker = new MessageTracker(Foo.class, 10, ticker);
     }
 
     @Test
     public void testNoTracking() {
-        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
-
         MessageTracker.Context context1 = messageTracker.received(new Foo());
         context1.done();
 
-        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
-
+        ticker.increment(MILLISECONDS.toNanos(20));
         MessageTracker.Context context2 = messageTracker.received(new Foo());
         context2.done();
-
     }
 
     @Test
     public void testFailedExpectationOnTracking() {
-        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
         messageTracker.begin();
 
         MessageTracker.Context context1 = messageTracker.received(new Foo());
         context1.done();
 
-        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+        ticker.increment(MILLISECONDS.toNanos(20));
 
         MessageTracker.Context context2 = messageTracker.received(new Foo());
         Assert.assertEquals(true, context2.error().isPresent());
@@ -58,17 +76,16 @@ public class MessageTrackerTest {
 
     @Test
     public void testFailedExpectationOnTrackingWithMessagesInBetween() {
-        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
         messageTracker.begin();
 
         MessageTracker.Context context1 = messageTracker.received(new Foo());
         context1.done();
 
         messageTracker.received("A").done();
-        messageTracker.received(Long.valueOf(10)).done();
-        MessageTracker.Context c = messageTracker.received(Integer.valueOf(100));
+        messageTracker.received(10L).done();
+        MessageTracker.Context c = messageTracker.received(100);
 
-        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+        ticker.increment(MILLISECONDS.toNanos(20));
 
         c.done();
 
@@ -86,7 +103,7 @@ public class MessageTrackerTest {
         Assert.assertEquals(String.class, messageProcessingTimes.get(0).getMessageClass());
         Assert.assertEquals(Long.class, messageProcessingTimes.get(1).getMessageClass());
         Assert.assertEquals(Integer.class, messageProcessingTimes.get(2).getMessageClass());
-        Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > TimeUnit.MILLISECONDS.toNanos(10));
+        Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > MILLISECONDS.toNanos(10));
         Assert.assertEquals(Foo.class, error.getLastExpectedMessage().getClass());
         Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass());
 
@@ -96,13 +113,12 @@ public class MessageTrackerTest {
 
     @Test
     public void testMetExpectationOnTracking() {
-        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
         messageTracker.begin();
 
         MessageTracker.Context context1 = messageTracker.received(new Foo());
         context1.done();
 
-        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+        ticker.increment(MILLISECONDS.toNanos(1));
 
         MessageTracker.Context context2 = messageTracker.received(new Foo());
         Assert.assertEquals(false, context2.error().isPresent());
@@ -111,7 +127,6 @@ public class MessageTrackerTest {
 
     @Test
     public void testIllegalStateExceptionWhenDoneIsNotCalledWhileTracking() {
-        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
         messageTracker.begin();
 
         messageTracker.received(new Foo());
@@ -126,19 +141,15 @@ public class MessageTrackerTest {
 
     @Test
     public void testNoIllegalStateExceptionWhenDoneIsNotCalledWhileNotTracking() {
-        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
-
         messageTracker.received(new Foo());
         messageTracker.received(new Foo());
     }
 
     @Test
     public void testDelayInFirstExpectedMessageArrival(){
-
-        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
         messageTracker.begin();
 
-        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+        ticker.increment(MILLISECONDS.toNanos(20));
 
         MessageTracker.Context context = messageTracker.received(new Foo());
 
@@ -157,10 +168,9 @@ public class MessageTrackerTest {
 
     @Test
     public void testCallingBeginDoesNotResetWatch() {
-        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
         messageTracker.begin();
 
-        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+        ticker.increment(MILLISECONDS.toNanos(20));
 
         messageTracker.begin();
 
@@ -173,7 +183,6 @@ public class MessageTrackerTest {
     @Test
     public void testMessagesSinceLastExpectedMessage() {
 
-        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
         messageTracker.begin();
 
         MessageTracker.Context context1 = messageTracker.received(Integer.valueOf(45)).done();