import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
* expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
* received between the arrival of two instances of the same message and the amount of time it took to process each
* of those messages.
- * <br/>
+ * <br>
* Usage of the API is as follows,
* <pre>
*
*
* .....
*
- * MessageTracker.Context context = tracker.received(message);
- *
- * if(context.error().isPresent()){
- * LOG.error("{}", context.error().get());
- * }
+ * try (MessageTracker.Context context = tracker.received(message)) {
*
- * // Some custom processing
- * process(message);
+ * if (context.error().isPresent()){
+ * LOG.error("{}", context.error().get());
+ * }
*
- * context.done();
+ * // Some custom processing
+ * process(message);
+ * }
*
* </pre>
*/
@Beta
@NotThreadSafe
public final class MessageTracker {
- public static abstract class Context {
+ public abstract static class Context implements AutoCloseable {
Context() {
// Hidden to prevent outside instantiation
}
- public abstract Context done();
public abstract Optional<Error> error();
+
+ @Override
+ public abstract void close();
}
public interface Error {
Object getLastExpectedMessage();
+
Object getCurrentExpectedMessage();
+
List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
}
@Override
public String toString() {
- return "MessageProcessingTime{" +
- "messageClass=" + messageClass.getSimpleName() +
- ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) +
- '}';
+ return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis="
+ + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]";
}
+
public Class<?> getMessageClass() {
return messageClass;
}
private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
private static final Context NO_OP_CONTEXT = new Context() {
@Override
- public Context done() {
- return this;
+ public void close() {
+ // No-op
}
@Override
}
/**
+ * Constructs an instance.
*
- * @param expectedMessageClass The class of the message to track
- * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
+ * @param expectedMessageClass the class of the message to track
+ * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected
* message
*/
public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
private abstract class AbstractTimedContext extends Context {
abstract Object message();
+
abstract Stopwatch stopTimer();
@Override
- public final Context done() {
+ public final void close() {
processed(message(), stopTimer().elapsed(NANOSECONDS));
- return this;
}
}
private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
private Object message;
- void reset(final Object message) {
- this.message = Preconditions.checkNotNull(message);
+ void reset(final Object newMessage) {
+ this.message = Preconditions.checkNotNull(newMessage);
Preconditions.checkState(!stopwatch.isRunning(),
- "Trying to reset a context that is not done (%s). currentMessage = %s", this, message);
+ "Trying to reset a context that is not done (%s). currentMessage = %s", this, newMessage);
stopwatch.start();
}