Fix findbugs warnings
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / MessageTracker.java
index f046240734762e8611a544526e740bd461378924..65cef56a2bb9624a7b30d847b5932d1c79f82d14 100644 (file)
@@ -8,20 +8,28 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
 import com.google.common.collect.ImmutableList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
  * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
  * received between the arrival of two instances of the same message and the amount of time it took to process each
  * of those messages.
- * <br/>
+ * <br>
  * Usage of the API is as follows,
  * <pre>
  *
@@ -34,147 +42,181 @@ import java.util.concurrent.TimeUnit;
  *
  *     .....
  *
- *     MessageTracker.Context context = tracker.received(message);
- *
- *     if(context.error().isPresent()){
- *         LOG.error("{}", context.error().get());
- *     }
+ *     try (MessageTracker.Context context = tracker.received(message)) {
  *
- *     // Some custom processing
- *     process(message);
+ *         if (context.error().isPresent()){
+ *             LOG.error("{}", context.error().get());
+ *         }
  *
- *     context.done();
+ *         // Some custom processing
+ *         process(message);
+ *     }
  *
  * </pre>
  */
+@Beta
+@NotThreadSafe
 public final class MessageTracker {
+    public abstract static class Context implements AutoCloseable {
+        Context() {
+            // Hidden to prevent outside instantiation
+        }
 
-    private static final Context NO_OP_CONTEXT = new NoOpContext();
+        public abstract Optional<Error> error();
 
-    private final Class<?> expectedMessageClass;
+        @Override
+        public abstract void close();
+    }
 
-    private final long expectedArrivalInterval;
+    public interface Error {
+        Object getLastExpectedMessage();
 
-    private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
+        Object getCurrentExpectedMessage();
 
-    private Stopwatch expectedMessageWatch;
+        List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+    }
 
-    private boolean enabled = false;
 
-    private Object lastExpectedMessage;
+    public static final class MessageProcessingTime {
+        private final Class<?> messageClass;
+        private final long elapsedTimeInNanos;
 
-    private Object currentMessage;
+        MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
+            this.messageClass = Preconditions.checkNotNull(messageClass);
+            this.elapsedTimeInNanos = elapsedTimeInNanos;
+        }
 
-    private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
+        @Override
+        public String toString() {
+            return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis="
+                   + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]";
+        }
 
-    /**
-     *
-     * @param expectedMessageClass The class of the message to track
-     * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
-     *                                        message
-     */
-    public MessageTracker(Class<?> expectedMessageClass, long expectedArrivalIntervalInMillis){
-        this.expectedMessageClass = expectedMessageClass;
-        this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
-    }
 
-    public void begin(){
-        if(enabled) {
-            return;
+        public Class<?> getMessageClass() {
+            return messageClass;
         }
-        enabled = true;
-        expectedMessageWatch = Stopwatch.createStarted();
-    }
 
-    public Context received(Object message){
-        if(!enabled) {
-            return NO_OP_CONTEXT;
-        }
-        this.currentMessage = message;
-        if(expectedMessageClass.isInstance(message)){
-            long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
-            if(actualElapsedTime > expectedArrivalInterval){
-                return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
-                        ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
-                        actualElapsedTime)));
-            }
-            this.lastExpectedMessage = message;
-            this.messagesSinceLastExpectedMessage.clear();
+        public long getElapsedTimeInNanos() {
+            return elapsedTimeInNanos;
         }
-
-        currentMessageContext.reset();
-        return currentMessageContext;
     }
 
-    private void processed(Object message, long messageElapseTimeInNanos){
-        if(!enabled) {
-            return;
+    private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
+    private static final Context NO_OP_CONTEXT = new Context() {
+        @Override
+        public void close() {
+            // No-op
         }
-        if(!expectedMessageClass.isInstance(message)){
-            this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
+
+        @Override
+        public Optional<Error> error() {
+            return Optional.absent();
         }
-    }
+    };
+
+    private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
+
+    private final CurrentMessageContext currentMessageContext;
+
+    private final Stopwatch expectedMessageWatch;
+
+    private final Class<?> expectedMessageClass;
+
+    private final long expectedArrivalInterval;
+
+    private final Ticker ticker;
 
-    public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
-        return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
+    private Object lastExpectedMessage;
+
+    @VisibleForTesting
+    MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
+            final Ticker ticker) {
+        Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
+        this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
+        this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
+        this.ticker = Preconditions.checkNotNull(ticker);
+        this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
+        this.currentMessageContext = new CurrentMessageContext();
     }
 
-    public static class MessageProcessingTime {
-        private final Class<?> messageClass;
-        private final long elapsedTimeInNanos;
+    /**
+     * Constructs an instance.
+     *
+     * @param expectedMessageClass the class of the message to track
+     * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected
+     *                                        message
+     */
+    public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
+        this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
+    }
 
-        MessageProcessingTime(Class<?> messageClass, long elapsedTimeInNanos){
-            this.messageClass = messageClass;
-            this.elapsedTimeInNanos = elapsedTimeInNanos;
+    public void begin() {
+        if (!expectedMessageWatch.isRunning()) {
+            LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
+            expectedMessageWatch.start();
         }
+    }
 
-        @Override
-        public String toString() {
-            return "MessageProcessingTime{" +
-                    "messageClass=" + messageClass.getSimpleName() +
-                    ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
-                    '}';
+    public Context received(final Object message) {
+        if (!expectedMessageWatch.isRunning()) {
+            return NO_OP_CONTEXT;
         }
 
-        public Class<?> getMessageClass() {
-            return messageClass;
+        if (expectedMessageClass.isInstance(message)) {
+            final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
+            if (actualElapsedTime > expectedArrivalInterval) {
+                return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
+                        messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
+            }
+            lastExpectedMessage = message;
+            messagesSinceLastExpectedMessage.clear();
+            expectedMessageWatch.reset().start();
         }
 
-        public long getElapsedTimeInNanos() {
-            return elapsedTimeInNanos;
-        }
+        currentMessageContext.reset(message);
+        return currentMessageContext;
     }
 
-    public interface Error {
-        Object getLastExpectedMessage();
-        Object getCurrentExpectedMessage();
-        List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+    void processed(final Object message, final long messageElapseTimeInNanos) {
+        if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
+            messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
+                messageElapseTimeInNanos));
+        }
     }
 
-    private class FailedExpectation implements Error {
+    public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
+        return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
+    }
 
+    private static final class FailedExpectation implements Error {
         private final Object lastExpectedMessage;
         private final Object currentExpectedMessage;
         private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
         private final long expectedTimeInMillis;
         private final long actualTimeInMillis;
 
-        public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
+        FailedExpectation(final Object lastExpectedMessage, final Object message,
+                final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
+                final long actualTimeNanos) {
             this.lastExpectedMessage = lastExpectedMessage;
             this.currentExpectedMessage = message;
-            this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
-            this.expectedTimeInMillis = expectedTimeInMillis;
-            this.actualTimeInMillis = actualTimeInMillis;
+            this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
+            this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
+            this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
         }
 
+        @Override
         public Object getLastExpectedMessage() {
             return lastExpectedMessage;
         }
 
+        @Override
         public Object getCurrentExpectedMessage() {
             return currentExpectedMessage;
         }
 
+        @Override
         public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
             return messagesSinceLastExpectedMessage;
         }
@@ -182,81 +224,78 @@ public final class MessageTracker {
         @Override
         public String toString() {
             StringBuilder builder = new StringBuilder();
-            builder.append("\n> Last Expected Message = " + lastExpectedMessage);
-            builder.append("\n> Current Expected Message = " + currentExpectedMessage);
-            builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
-            builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
+            builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
+            builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
+            builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
+            builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
             for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
-                builder.append("\n\t> ").append(time.toString());
+                builder.append("\n\t> ").append(time);
             }
             return builder.toString();
         }
-
-    }
-
-    public interface Context {
-        Context done();
-        Optional<? extends Error> error();
     }
 
-    private static class NoOpContext implements Context {
+    private abstract class AbstractTimedContext extends Context {
+        abstract Object message();
 
-        @Override
-        public Context done() {
-            return this;
-        }
+        abstract Stopwatch stopTimer();
 
         @Override
-        public Optional<Error> error() {
-            return Optional.absent();
+        public final void close() {
+            processed(message(), stopTimer().elapsed(NANOSECONDS));
         }
     }
 
-    private class CurrentMessageContext implements Context {
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        boolean done = true;
+    private final class CurrentMessageContext extends AbstractTimedContext {
+        private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
+        private Object message;
+
+        void reset(final Object newMessage) {
+            this.message = Preconditions.checkNotNull(newMessage);
+            Preconditions.checkState(!stopwatch.isRunning(),
+                "Trying to reset a context that is not done (%s). currentMessage = %s", this, newMessage);
+            stopwatch.start();
+        }
 
-        public void reset(){
-            Preconditions.checkState(done,
-                    String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage));
-            done = false;
-            stopwatch.reset().start();
+        @Override
+        Object message() {
+            return message;
         }
 
         @Override
-        public Context done() {
-            processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
-            done = true;
-            return this;
+        Stopwatch stopTimer() {
+            return stopwatch.stop();
         }
 
         @Override
-        public Optional<? extends Error> error() {
+        public Optional<Error> error() {
             return Optional.absent();
         }
     }
 
-    private class ErrorContext implements Context {
-        Object message;
-        private final Optional<? extends Error> error;
-        Stopwatch stopwatch;
+    private final class ErrorContext extends AbstractTimedContext {
+        private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
+        private final Object message;
+        private final Error error;
+
+        ErrorContext(final Object message, final Error error) {
+            this.message = Preconditions.checkNotNull(message);
+            this.error = Preconditions.checkNotNull(error);
+        }
 
-        ErrorContext(Object message, Optional<? extends Error> error){
-            this.message = message;
-            this.error = error;
-            this.stopwatch = Stopwatch.createStarted();
+        @Override
+        Object message() {
+            return message;
         }
 
         @Override
-        public Context done(){
-            processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
-            this.stopwatch.stop();
-            return this;
+        Stopwatch stopTimer() {
+            return stopwatch.stop();
         }
 
         @Override
-        public Optional<? extends Error> error() {
-            return error;
+        public Optional<Error> error() {
+            return Optional.of(error);
         }
     }
 }