* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.common.actor;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import java.util.LinkedList;
import java.util.List;
-import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
* received between the arrival of two instances of the same message and the amount of time it took to process each
* of those messages.
- * <br/>
+ * <br>
* Usage of the API is as follows,
* <pre>
*
*
* .....
*
- * MessageTracker.Context context = tracker.received(message);
- *
- * if(context.error().isPresent()){
- * LOG.error("{}", context.error().get());
- * }
+ * try (MessageTracker.Context context = tracker.received(message)) {
*
- * // Some custom processing
- * process(message);
+ * if (context.error().isPresent()){
+ * LOG.error("{}", context.error().get());
+ * }
*
- * context.done();
+ * // Some custom processing
+ * process(message);
+ * }
*
* </pre>
+ *
+ * <p>
+ * This class is NOT thread-safe.
*/
@Beta
-@NotThreadSafe
public final class MessageTracker {
- public static abstract class Context {
+ public abstract static class Context implements AutoCloseable {
Context() {
// Hidden to prevent outside instantiation
}
- public abstract Context done();
public abstract Optional<Error> error();
+
+ @Override
+ public abstract void close();
}
public interface Error {
Object getLastExpectedMessage();
+
Object getCurrentExpectedMessage();
+
List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
}
private final long elapsedTimeInNanos;
MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
- this.messageClass = Preconditions.checkNotNull(messageClass);
+ this.messageClass = requireNonNull(messageClass);
this.elapsedTimeInNanos = elapsedTimeInNanos;
}
@Override
public String toString() {
- return "MessageProcessingTime{" +
- "messageClass=" + messageClass.getSimpleName() +
- ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) +
- '}';
+ return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis="
+ + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]";
}
+
public Class<?> getMessageClass() {
return messageClass;
}
private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
private static final Context NO_OP_CONTEXT = new Context() {
@Override
- public Context done() {
- return this;
+ public void close() {
+ // No-op
}
@Override
public Optional<Error> error() {
- return Optional.absent();
+ return Optional.empty();
}
};
@VisibleForTesting
MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
final Ticker ticker) {
- Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
- this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
+ checkArgument(expectedArrivalIntervalInMillis >= 0);
+ this.expectedMessageClass = requireNonNull(expectedMessageClass);
this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
- this.ticker = Preconditions.checkNotNull(ticker);
+ this.ticker = requireNonNull(ticker);
this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
this.currentMessageContext = new CurrentMessageContext();
}
/**
+ * Constructs an instance.
*
- * @param expectedMessageClass The class of the message to track
- * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
+ * @param expectedMessageClass the class of the message to track
+ * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected
* message
*/
public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
- builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
- builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
- builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
+ StringBuilder builder = new StringBuilder()
+ .append("\n> Last Expected Message = ").append(lastExpectedMessage)
+ .append("\n> Current Expected Message = ").append(currentExpectedMessage)
+ .append("\n> Expected time in between messages = ").append(expectedTimeInMillis)
+ .append("\n> Actual time in between messages = ").append(actualTimeInMillis);
for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
builder.append("\n\t> ").append(time);
}
private abstract class AbstractTimedContext extends Context {
abstract Object message();
+
abstract Stopwatch stopTimer();
@Override
- public final Context done() {
+ public final void close() {
processed(message(), stopTimer().elapsed(NANOSECONDS));
- return this;
}
}
private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
private Object message;
- void reset(final Object message) {
- this.message = Preconditions.checkNotNull(message);
- Preconditions.checkState(!stopwatch.isRunning(),
- "Trying to reset a context that is not done (%s). currentMessage = %s", this, message);
+ void reset(final Object newMessage) {
+ this.message = requireNonNull(newMessage);
+ checkState(!stopwatch.isRunning(), "Trying to reset a context that is not done (%s). currentMessage = %s",
+ this, newMessage);
stopwatch.start();
}
@Override
public Optional<Error> error() {
- return Optional.absent();
+ return Optional.empty();
}
}
private final Error error;
ErrorContext(final Object message, final Error error) {
- this.message = Preconditions.checkNotNull(message);
- this.error = Preconditions.checkNotNull(error);
+ this.message = requireNonNull(message);
+ this.error = requireNonNull(error);
}
@Override