Refactor MessageTracker
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / MessageTracker.java
index f046240734762e8611a544526e740bd461378924..08516252fd601608f661f44293efe105de914033 100644 (file)
@@ -8,13 +8,20 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
 import com.google.common.collect.ImmutableList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
@@ -47,25 +54,88 @@ import java.util.concurrent.TimeUnit;
  *
  * </pre>
  */
+@Beta
+@NotThreadSafe
 public final class MessageTracker {
+    public static abstract class Context {
+        Context() {
+            // Hidden to prevent outside instantiation
+        }
 
-    private static final Context NO_OP_CONTEXT = new NoOpContext();
+        public abstract Context done();
+        public abstract Optional<Error> error();
+    }
 
-    private final Class<?> expectedMessageClass;
+    public interface Error {
+        Object getLastExpectedMessage();
+        Object getCurrentExpectedMessage();
+        List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+    }
 
-    private final long expectedArrivalInterval;
+
+    public static final class MessageProcessingTime {
+        private final Class<?> messageClass;
+        private final long elapsedTimeInNanos;
+
+        MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
+            this.messageClass = Preconditions.checkNotNull(messageClass);
+            this.elapsedTimeInNanos = elapsedTimeInNanos;
+        }
+
+        @Override
+        public String toString() {
+            return "MessageProcessingTime{" +
+                    "messageClass=" + messageClass.getSimpleName() +
+                    ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) +
+                    '}';
+        }
+
+        public Class<?> getMessageClass() {
+            return messageClass;
+        }
+
+        public long getElapsedTimeInNanos() {
+            return elapsedTimeInNanos;
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
+    private static final Context NO_OP_CONTEXT = new Context() {
+        @Override
+        public Context done() {
+            return this;
+        }
+
+        @Override
+        public Optional<Error> error() {
+            return Optional.absent();
+        }
+    };
 
     private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
 
-    private Stopwatch expectedMessageWatch;
+    private final CurrentMessageContext currentMessageContext;
 
-    private boolean enabled = false;
+    private final Stopwatch expectedMessageWatch;
 
-    private Object lastExpectedMessage;
+    private final Class<?> expectedMessageClass;
+
+    private final long expectedArrivalInterval;
 
-    private Object currentMessage;
+    private final Ticker ticker;
 
-    private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
+    private Object lastExpectedMessage;
+
+    @VisibleForTesting
+    MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
+            final Ticker ticker) {
+        Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
+        this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
+        this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
+        this.ticker = Preconditions.checkNotNull(ticker);
+        this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
+        this.currentMessageContext = new CurrentMessageContext();
+    }
 
     /**
      *
@@ -73,108 +143,76 @@ public final class MessageTracker {
      * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
      *                                        message
      */
-    public MessageTracker(Class<?> expectedMessageClass, long expectedArrivalIntervalInMillis){
-        this.expectedMessageClass = expectedMessageClass;
-        this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
+    public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
+        this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
     }
 
-    public void begin(){
-        if(enabled) {
-            return;
+    public void begin() {
+        if (!expectedMessageWatch.isRunning()) {
+            LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
+            expectedMessageWatch.start();
         }
-        enabled = true;
-        expectedMessageWatch = Stopwatch.createStarted();
     }
 
-    public Context received(Object message){
-        if(!enabled) {
+    public Context received(final Object message) {
+        if (!expectedMessageWatch.isRunning()) {
             return NO_OP_CONTEXT;
         }
-        this.currentMessage = message;
-        if(expectedMessageClass.isInstance(message)){
-            long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
-            if(actualElapsedTime > expectedArrivalInterval){
-                return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
-                        ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
-                        actualElapsedTime)));
+
+        if (expectedMessageClass.isInstance(message)) {
+            final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
+            if (actualElapsedTime > expectedArrivalInterval) {
+                return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
+                        messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
             }
-            this.lastExpectedMessage = message;
-            this.messagesSinceLastExpectedMessage.clear();
+            lastExpectedMessage = message;
+            messagesSinceLastExpectedMessage.clear();
+            expectedMessageWatch.reset().start();
         }
 
-        currentMessageContext.reset();
+        currentMessageContext.reset(message);
         return currentMessageContext;
     }
 
-    private void processed(Object message, long messageElapseTimeInNanos){
-        if(!enabled) {
-            return;
-        }
-        if(!expectedMessageClass.isInstance(message)){
-            this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
-        }
-    }
-
-    public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
-        return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
-    }
-
-    public static class MessageProcessingTime {
-        private final Class<?> messageClass;
-        private final long elapsedTimeInNanos;
-
-        MessageProcessingTime(Class<?> messageClass, long elapsedTimeInNanos){
-            this.messageClass = messageClass;
-            this.elapsedTimeInNanos = elapsedTimeInNanos;
-        }
-
-        @Override
-        public String toString() {
-            return "MessageProcessingTime{" +
-                    "messageClass=" + messageClass.getSimpleName() +
-                    ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
-                    '}';
-        }
-
-        public Class<?> getMessageClass() {
-            return messageClass;
-        }
-
-        public long getElapsedTimeInNanos() {
-            return elapsedTimeInNanos;
+    void processed(final Object message, final long messageElapseTimeInNanos) {
+        if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
+            messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
+                messageElapseTimeInNanos));
         }
     }
 
-    public interface Error {
-        Object getLastExpectedMessage();
-        Object getCurrentExpectedMessage();
-        List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+    public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
+        return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
     }
 
-    private class FailedExpectation implements Error {
-
+    private static final class FailedExpectation implements Error {
         private final Object lastExpectedMessage;
         private final Object currentExpectedMessage;
         private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
         private final long expectedTimeInMillis;
         private final long actualTimeInMillis;
 
-        public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
+        FailedExpectation(final Object lastExpectedMessage, final Object message,
+                final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
+                final long actualTimeNanos) {
             this.lastExpectedMessage = lastExpectedMessage;
             this.currentExpectedMessage = message;
-            this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
-            this.expectedTimeInMillis = expectedTimeInMillis;
-            this.actualTimeInMillis = actualTimeInMillis;
+            this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
+            this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
+            this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
         }
 
+        @Override
         public Object getLastExpectedMessage() {
             return lastExpectedMessage;
         }
 
+        @Override
         public Object getCurrentExpectedMessage() {
             return currentExpectedMessage;
         }
 
+        @Override
         public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
             return messagesSinceLastExpectedMessage;
         }
@@ -182,81 +220,78 @@ public final class MessageTracker {
         @Override
         public String toString() {
             StringBuilder builder = new StringBuilder();
-            builder.append("\n> Last Expected Message = " + lastExpectedMessage);
-            builder.append("\n> Current Expected Message = " + currentExpectedMessage);
-            builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
-            builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
+            builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
+            builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
+            builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
+            builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
             for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
-                builder.append("\n\t> ").append(time.toString());
+                builder.append("\n\t> ").append(time);
             }
             return builder.toString();
         }
-
     }
 
-    public interface Context {
-        Context done();
-        Optional<? extends Error> error();
-    }
-
-    private static class NoOpContext implements Context {
+    private abstract class AbstractTimedContext extends Context {
+        abstract Object message();
+        abstract Stopwatch stopTimer();
 
         @Override
-        public Context done() {
+        public final Context done() {
+            processed(message(), stopTimer().elapsed(NANOSECONDS));
             return this;
         }
-
-        @Override
-        public Optional<Error> error() {
-            return Optional.absent();
-        }
     }
 
-    private class CurrentMessageContext implements Context {
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        boolean done = true;
+    private final class CurrentMessageContext extends AbstractTimedContext {
+        private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
+        private Object message;
 
-        public void reset(){
-            Preconditions.checkState(done,
-                    String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage));
-            done = false;
-            stopwatch.reset().start();
+        void reset(final Object message) {
+            this.message = Preconditions.checkNotNull(message);
+            Preconditions.checkState(!stopwatch.isRunning(),
+                "Trying to reset a context that is not done (%s). currentMessage = %s", this, message);
+            stopwatch.start();
         }
 
         @Override
-        public Context done() {
-            processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
-            done = true;
-            return this;
+        Object message() {
+            return message;
+        }
+
+        @Override
+        Stopwatch stopTimer() {
+            return stopwatch.stop();
         }
 
         @Override
-        public Optional<? extends Error> error() {
+        public Optional<Error> error() {
             return Optional.absent();
         }
     }
 
-    private class ErrorContext implements Context {
-        Object message;
-        private final Optional<? extends Error> error;
-        Stopwatch stopwatch;
+    private final class ErrorContext extends AbstractTimedContext {
+        private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
+        private final Object message;
+        private final Error error;
 
-        ErrorContext(Object message, Optional<? extends Error> error){
-            this.message = message;
-            this.error = error;
-            this.stopwatch = Stopwatch.createStarted();
+        ErrorContext(final Object message, final Error error) {
+            this.message = Preconditions.checkNotNull(message);
+            this.error = Preconditions.checkNotNull(error);
         }
 
         @Override
-        public Context done(){
-            processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
-            this.stopwatch.stop();
-            return this;
+        Object message() {
+            return message;
+        }
+
+        @Override
+        Stopwatch stopTimer() {
+            return stopwatch.stop();
         }
 
         @Override
-        public Optional<? extends Error> error() {
-            return error;
+        public Optional<Error> error() {
+            return Optional.of(error);
         }
     }
 }