/* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.common.actor; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import java.util.LinkedList; import java.util.List; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that * received between the arrival of two instances of the same message and the amount of time it took to process each * of those messages. *
* Usage of the API is as follows, *
 *
 *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
 *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
 *
 *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
 *     // will do nothing
 *     tracker.begin();
 *
 *     .....
 *
 *     try (MessageTracker.Context context = tracker.received(message)) {
 *
 *         if (context.error().isPresent()){
 *             LOG.error("{}", context.error().get());
 *         }
 *
 *         // Some custom processing
 *         process(message);
 *     }
 *
 * 
* *

* This class is NOT thread-safe. */ @Beta public final class MessageTracker { public abstract static class Context implements AutoCloseable { Context() { // Hidden to prevent outside instantiation } public abstract Optional error(); @Override public abstract void close(); } public interface Error { Object getLastExpectedMessage(); Object getCurrentExpectedMessage(); List getMessageProcessingTimesSinceLastExpectedMessage(); } public static final class MessageProcessingTime { private final Class messageClass; private final long elapsedTimeInNanos; MessageProcessingTime(final Class messageClass, final long elapsedTimeInNanos) { this.messageClass = requireNonNull(messageClass); this.elapsedTimeInNanos = elapsedTimeInNanos; } @Override public String toString() { return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis=" + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]"; } public Class getMessageClass() { return messageClass; } public long getElapsedTimeInNanos() { return elapsedTimeInNanos; } } private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class); private static final Context NO_OP_CONTEXT = new Context() { @Override public void close() { // No-op } @Override public Optional error() { return Optional.empty(); } }; private final List messagesSinceLastExpectedMessage = new LinkedList<>(); private final CurrentMessageContext currentMessageContext; private final Stopwatch expectedMessageWatch; private final Class expectedMessageClass; private final long expectedArrivalInterval; private final Ticker ticker; private Object lastExpectedMessage; @VisibleForTesting MessageTracker(final Class expectedMessageClass, final long expectedArrivalIntervalInMillis, final Ticker ticker) { checkArgument(expectedArrivalIntervalInMillis >= 0); this.expectedMessageClass = requireNonNull(expectedMessageClass); this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis); this.ticker = requireNonNull(ticker); this.expectedMessageWatch = Stopwatch.createUnstarted(ticker); this.currentMessageContext = new CurrentMessageContext(); } /** * Constructs an instance. * * @param expectedMessageClass the class of the message to track * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected * message */ public MessageTracker(final Class expectedMessageClass, final long expectedArrivalIntervalInMillis) { this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker()); } public void begin() { if (!expectedMessageWatch.isRunning()) { LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval); expectedMessageWatch.start(); } } public Context received(final Object message) { if (!expectedMessageWatch.isRunning()) { return NO_OP_CONTEXT; } if (expectedMessageClass.isInstance(message)) { final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS); if (actualElapsedTime > expectedArrivalInterval) { return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message, messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime)); } lastExpectedMessage = message; messagesSinceLastExpectedMessage.clear(); expectedMessageWatch.reset().start(); } currentMessageContext.reset(message); return currentMessageContext; } void processed(final Object message, final long messageElapseTimeInNanos) { if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) { messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos)); } } public List getMessagesSinceLastExpectedMessage() { return ImmutableList.copyOf(messagesSinceLastExpectedMessage); } private static final class FailedExpectation implements Error { private final Object lastExpectedMessage; private final Object currentExpectedMessage; private final List messagesSinceLastExpectedMessage; private final long expectedTimeInMillis; private final long actualTimeInMillis; FailedExpectation(final Object lastExpectedMessage, final Object message, final List messagesSinceLastExpectedMessage, final long expectedTimeNanos, final long actualTimeNanos) { this.lastExpectedMessage = lastExpectedMessage; this.currentExpectedMessage = message; this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage); this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos); this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos); } @Override public Object getLastExpectedMessage() { return lastExpectedMessage; } @Override public Object getCurrentExpectedMessage() { return currentExpectedMessage; } @Override public List getMessageProcessingTimesSinceLastExpectedMessage() { return messagesSinceLastExpectedMessage; } @Override public String toString() { StringBuilder builder = new StringBuilder() .append("\n> Last Expected Message = ").append(lastExpectedMessage) .append("\n> Current Expected Message = ").append(currentExpectedMessage) .append("\n> Expected time in between messages = ").append(expectedTimeInMillis) .append("\n> Actual time in between messages = ").append(actualTimeInMillis); for (MessageProcessingTime time : messagesSinceLastExpectedMessage) { builder.append("\n\t> ").append(time); } return builder.toString(); } } private abstract class AbstractTimedContext extends Context { abstract Object message(); abstract Stopwatch stopTimer(); @Override public final void close() { processed(message(), stopTimer().elapsed(NANOSECONDS)); } } private final class CurrentMessageContext extends AbstractTimedContext { private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker); private Object message; void reset(final Object newMessage) { this.message = requireNonNull(newMessage); checkState(!stopwatch.isRunning(), "Trying to reset a context that is not done (%s). currentMessage = %s", this, newMessage); stopwatch.start(); } @Override Object message() { return message; } @Override Stopwatch stopTimer() { return stopwatch.stop(); } @Override public Optional error() { return Optional.empty(); } } private final class ErrorContext extends AbstractTimedContext { private final Stopwatch stopwatch = Stopwatch.createStarted(ticker); private final Object message; private final Error error; ErrorContext(final Object message, final Error error) { this.message = requireNonNull(message); this.error = requireNonNull(error); } @Override Object message() { return message; } @Override Stopwatch stopTimer() { return stopwatch.stop(); } @Override public Optional error() { return Optional.of(error); } } }