package org.opendaylight.controller.cluster.common.actor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
*
* </pre>
*/
+@Beta
+@NotThreadSafe
public final class MessageTracker {
+ public static abstract class Context {
+ Context() {
+ // Hidden to prevent outside instantiation
+ }
- private static final Context NO_OP_CONTEXT = new NoOpContext();
+ public abstract Context done();
+ public abstract Optional<Error> error();
+ }
- private final Class<?> expectedMessageClass;
+ public interface Error {
+ Object getLastExpectedMessage();
+ Object getCurrentExpectedMessage();
+ List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+ }
- private final long expectedArrivalInterval;
+
+ public static final class MessageProcessingTime {
+ private final Class<?> messageClass;
+ private final long elapsedTimeInNanos;
+
+ MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
+ this.messageClass = Preconditions.checkNotNull(messageClass);
+ this.elapsedTimeInNanos = elapsedTimeInNanos;
+ }
+
+ @Override
+ public String toString() {
+ return "MessageProcessingTime{" +
+ "messageClass=" + messageClass.getSimpleName() +
+ ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) +
+ '}';
+ }
+
+ public Class<?> getMessageClass() {
+ return messageClass;
+ }
+
+ public long getElapsedTimeInNanos() {
+ return elapsedTimeInNanos;
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
+ private static final Context NO_OP_CONTEXT = new Context() {
+ @Override
+ public Context done() {
+ return this;
+ }
+
+ @Override
+ public Optional<Error> error() {
+ return Optional.absent();
+ }
+ };
private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
- private Stopwatch expectedMessageWatch;
+ private final CurrentMessageContext currentMessageContext;
- private boolean enabled = false;
+ private final Stopwatch expectedMessageWatch;
- private Object lastExpectedMessage;
+ private final Class<?> expectedMessageClass;
+
+ private final long expectedArrivalInterval;
- private Object currentMessage;
+ private final Ticker ticker;
- private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
+ private Object lastExpectedMessage;
+
+ @VisibleForTesting
+ MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
+ final Ticker ticker) {
+ Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
+ this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
+ this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
+ this.ticker = Preconditions.checkNotNull(ticker);
+ this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
+ this.currentMessageContext = new CurrentMessageContext();
+ }
/**
*
* @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
* message
*/
- public MessageTracker(Class<?> expectedMessageClass, long expectedArrivalIntervalInMillis){
- this.expectedMessageClass = expectedMessageClass;
- this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
+ public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
+ this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
}
- public void begin(){
- if(enabled) {
- return;
+ public void begin() {
+ if (!expectedMessageWatch.isRunning()) {
+ LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
+ expectedMessageWatch.start();
}
- enabled = true;
- expectedMessageWatch = Stopwatch.createStarted();
}
- public Context received(Object message){
- if(!enabled) {
+ public Context received(final Object message) {
+ if (!expectedMessageWatch.isRunning()) {
return NO_OP_CONTEXT;
}
- this.currentMessage = message;
- if(expectedMessageClass.isInstance(message)){
- long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
- if(actualElapsedTime > expectedArrivalInterval){
- return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
- ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
- actualElapsedTime)));
+
+ if (expectedMessageClass.isInstance(message)) {
+ final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
+ if (actualElapsedTime > expectedArrivalInterval) {
+ return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
+ messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
}
- this.lastExpectedMessage = message;
- this.messagesSinceLastExpectedMessage.clear();
+ lastExpectedMessage = message;
+ messagesSinceLastExpectedMessage.clear();
+ expectedMessageWatch.reset().start();
}
- currentMessageContext.reset();
+ currentMessageContext.reset(message);
return currentMessageContext;
}
- private void processed(Object message, long messageElapseTimeInNanos){
- if(!enabled) {
- return;
- }
- if(!expectedMessageClass.isInstance(message)){
- this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
- }
- }
-
- public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
- return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
- }
-
- public static class MessageProcessingTime {
- private final Class<?> messageClass;
- private final long elapsedTimeInNanos;
-
- MessageProcessingTime(Class<?> messageClass, long elapsedTimeInNanos){
- this.messageClass = messageClass;
- this.elapsedTimeInNanos = elapsedTimeInNanos;
- }
-
- @Override
- public String toString() {
- return "MessageProcessingTime{" +
- "messageClass=" + messageClass.getSimpleName() +
- ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
- '}';
- }
-
- public Class<?> getMessageClass() {
- return messageClass;
- }
-
- public long getElapsedTimeInNanos() {
- return elapsedTimeInNanos;
+ void processed(final Object message, final long messageElapseTimeInNanos) {
+ if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
+ messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
+ messageElapseTimeInNanos));
}
}
- public interface Error {
- Object getLastExpectedMessage();
- Object getCurrentExpectedMessage();
- List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+ public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
+ return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
}
- private class FailedExpectation implements Error {
-
+ private static final class FailedExpectation implements Error {
private final Object lastExpectedMessage;
private final Object currentExpectedMessage;
private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
private final long expectedTimeInMillis;
private final long actualTimeInMillis;
- public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
+ FailedExpectation(final Object lastExpectedMessage, final Object message,
+ final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
+ final long actualTimeNanos) {
this.lastExpectedMessage = lastExpectedMessage;
this.currentExpectedMessage = message;
- this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
- this.expectedTimeInMillis = expectedTimeInMillis;
- this.actualTimeInMillis = actualTimeInMillis;
+ this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
+ this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
+ this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
}
+ @Override
public Object getLastExpectedMessage() {
return lastExpectedMessage;
}
+ @Override
public Object getCurrentExpectedMessage() {
return currentExpectedMessage;
}
+ @Override
public List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage() {
return messagesSinceLastExpectedMessage;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("\n> Last Expected Message = " + lastExpectedMessage);
- builder.append("\n> Current Expected Message = " + currentExpectedMessage);
- builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
- builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
+ builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
+ builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
+ builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
+ builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
- builder.append("\n\t> ").append(time.toString());
+ builder.append("\n\t> ").append(time);
}
return builder.toString();
}
-
}
- public interface Context {
- Context done();
- Optional<? extends Error> error();
- }
-
- private static class NoOpContext implements Context {
+ private abstract class AbstractTimedContext extends Context {
+ abstract Object message();
+ abstract Stopwatch stopTimer();
@Override
- public Context done() {
+ public final Context done() {
+ processed(message(), stopTimer().elapsed(NANOSECONDS));
return this;
}
-
- @Override
- public Optional<Error> error() {
- return Optional.absent();
- }
}
- private class CurrentMessageContext implements Context {
- Stopwatch stopwatch = Stopwatch.createStarted();
- boolean done = true;
+ private final class CurrentMessageContext extends AbstractTimedContext {
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
+ private Object message;
- public void reset(){
- Preconditions.checkState(done,
- String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage));
- done = false;
- stopwatch.reset().start();
+ void reset(final Object message) {
+ this.message = Preconditions.checkNotNull(message);
+ Preconditions.checkState(!stopwatch.isRunning(),
+ "Trying to reset a context that is not done (%s). currentMessage = %s", this, message);
+ stopwatch.start();
}
@Override
- public Context done() {
- processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
- done = true;
- return this;
+ Object message() {
+ return message;
+ }
+
+ @Override
+ Stopwatch stopTimer() {
+ return stopwatch.stop();
}
@Override
- public Optional<? extends Error> error() {
+ public Optional<Error> error() {
return Optional.absent();
}
}
- private class ErrorContext implements Context {
- Object message;
- private final Optional<? extends Error> error;
- Stopwatch stopwatch;
+ private final class ErrorContext extends AbstractTimedContext {
+ private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
+ private final Object message;
+ private final Error error;
- ErrorContext(Object message, Optional<? extends Error> error){
- this.message = message;
- this.error = error;
- this.stopwatch = Stopwatch.createStarted();
+ ErrorContext(final Object message, final Error error) {
+ this.message = Preconditions.checkNotNull(message);
+ this.error = Preconditions.checkNotNull(error);
}
@Override
- public Context done(){
- processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
- this.stopwatch.stop();
- return this;
+ Object message() {
+ return message;
+ }
+
+ @Override
+ Stopwatch stopTimer() {
+ return stopwatch.stop();
}
@Override
- public Optional<? extends Error> error() {
- return error;
+ public Optional<Error> error() {
+ return Optional.of(error);
}
}
}
package org.opendaylight.controller.cluster.common.actor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.common.base.Ticker;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageTrackerTest {
+ private static final class TestTicker extends Ticker {
+ private long ticks;
+
+ @Override
+ public long read() {
+ return ticks;
+ }
+
+ void increment(final long ticks) {
+ this.ticks += ticks;
+ }
+ }
+
+ private static final class Foo {
+ // Intentionally empty
+ }
+
private final static Logger LOG = LoggerFactory.getLogger(MessageTrackerTest.class);
- private class Foo {
+ private TestTicker ticker;
+ private MessageTracker messageTracker;
+ @Before
+ public void setup() {
+ ticker = new TestTicker();
+ messageTracker = new MessageTracker(Foo.class, 10, ticker);
}
@Test
public void testNoTracking() {
- MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
-
MessageTracker.Context context1 = messageTracker.received(new Foo());
context1.done();
- Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
-
+ ticker.increment(MILLISECONDS.toNanos(20));
MessageTracker.Context context2 = messageTracker.received(new Foo());
context2.done();
-
}
@Test
public void testFailedExpectationOnTracking() {
- MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
messageTracker.begin();
MessageTracker.Context context1 = messageTracker.received(new Foo());
context1.done();
- Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+ ticker.increment(MILLISECONDS.toNanos(20));
MessageTracker.Context context2 = messageTracker.received(new Foo());
Assert.assertEquals(true, context2.error().isPresent());
@Test
public void testFailedExpectationOnTrackingWithMessagesInBetween() {
- MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
messageTracker.begin();
MessageTracker.Context context1 = messageTracker.received(new Foo());
context1.done();
messageTracker.received("A").done();
- messageTracker.received(Long.valueOf(10)).done();
- MessageTracker.Context c = messageTracker.received(Integer.valueOf(100));
+ messageTracker.received(10L).done();
+ MessageTracker.Context c = messageTracker.received(100);
- Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+ ticker.increment(MILLISECONDS.toNanos(20));
c.done();
Assert.assertEquals(String.class, messageProcessingTimes.get(0).getMessageClass());
Assert.assertEquals(Long.class, messageProcessingTimes.get(1).getMessageClass());
Assert.assertEquals(Integer.class, messageProcessingTimes.get(2).getMessageClass());
- Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > TimeUnit.MILLISECONDS.toNanos(10));
+ Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > MILLISECONDS.toNanos(10));
Assert.assertEquals(Foo.class, error.getLastExpectedMessage().getClass());
Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass());
@Test
public void testMetExpectationOnTracking() {
- MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
messageTracker.begin();
MessageTracker.Context context1 = messageTracker.received(new Foo());
context1.done();
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+ ticker.increment(MILLISECONDS.toNanos(1));
MessageTracker.Context context2 = messageTracker.received(new Foo());
Assert.assertEquals(false, context2.error().isPresent());
@Test
public void testIllegalStateExceptionWhenDoneIsNotCalledWhileTracking() {
- MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
messageTracker.begin();
messageTracker.received(new Foo());
@Test
public void testNoIllegalStateExceptionWhenDoneIsNotCalledWhileNotTracking() {
- MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
-
messageTracker.received(new Foo());
messageTracker.received(new Foo());
}
@Test
public void testDelayInFirstExpectedMessageArrival(){
-
- MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
messageTracker.begin();
- Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+ ticker.increment(MILLISECONDS.toNanos(20));
MessageTracker.Context context = messageTracker.received(new Foo());
@Test
public void testCallingBeginDoesNotResetWatch() {
- MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
messageTracker.begin();
- Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+ ticker.increment(MILLISECONDS.toNanos(20));
messageTracker.begin();
@Test
public void testMessagesSinceLastExpectedMessage() {
- MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
messageTracker.begin();
MessageTracker.Context context1 = messageTracker.received(Integer.valueOf(45)).done();