/* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.common.actor; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; /** * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that * received between the arrival of two instances of the same message and the amount of time it took to process each * of those messages. *
* Usage of the API is as follows, *
 *
 *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
 *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
 *
 *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
 *     // will do nothing
 *     tracker.begin();
 *
 *     .....
 *
 *     MessageTracker.Context context = tracker.received(message);
 *
 *     if(context.error().isPresent()){
 *         LOG.error("{}", context.error().get());
 *     }
 *
 *     // Some custom processing
 *     process(message);
 *
 *     context.done();
 *
 * 
*/ public final class MessageTracker { private static final Context NO_OP_CONTEXT = new NoOpContext(); private final Class expectedMessageClass; private final long expectedArrivalInterval; private final List messagesSinceLastExpectedMessage = new LinkedList<>(); private Stopwatch expectedMessageWatch; private boolean enabled = false; private Object lastExpectedMessage; private Object currentMessage; private final CurrentMessageContext currentMessageContext = new CurrentMessageContext(); /** * * @param expectedMessageClass The class of the message to track * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected * message */ public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){ this.expectedMessageClass = expectedMessageClass; this.expectedArrivalInterval = expectedArrivalIntervalInMillis; } public void begin(){ if(enabled) { return; } enabled = true; expectedMessageWatch = Stopwatch.createStarted(); } public Context received(Object message){ if(!enabled) { return NO_OP_CONTEXT; } this.currentMessage = message; if(expectedMessageClass.isInstance(message)){ long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS); if(actualElapsedTime > expectedArrivalInterval){ return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message, ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval, actualElapsedTime))); } this.lastExpectedMessage = message; this.messagesSinceLastExpectedMessage.clear(); } currentMessageContext.reset(); return currentMessageContext; } private void processed(Object message, long messageElapseTimeInNanos){ if(!enabled) { return; } if(!expectedMessageClass.isInstance(message)){ this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos)); } } public List getMessagesSinceLastExpectedMessage(){ return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage); } public static class MessageProcessingTime { private final Class messageClass; private final long elapsedTimeInNanos; MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){ this.messageClass = messageClass; this.elapsedTimeInNanos = elapsedTimeInNanos; } @Override public String toString() { return "MessageProcessingTime{" + "messageClass=" + messageClass.getSimpleName() + ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) + '}'; } public Class getMessageClass() { return messageClass; } public long getElapsedTimeInNanos() { return elapsedTimeInNanos; } } public interface Error { Object getLastExpectedMessage(); Object getCurrentExpectedMessage(); List getMessageProcessingTimesSinceLastExpectedMessage(); } private class FailedExpectation implements Error { private final Object lastExpectedMessage; private final Object currentExpectedMessage; private final List messagesSinceLastExpectedMessage; private final long expectedTimeInMillis; private final long actualTimeInMillis; public FailedExpectation(Object lastExpectedMessage, Object message, List messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) { this.lastExpectedMessage = lastExpectedMessage; this.currentExpectedMessage = message; this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage; this.expectedTimeInMillis = expectedTimeInMillis; this.actualTimeInMillis = actualTimeInMillis; } public Object getLastExpectedMessage() { return lastExpectedMessage; } public Object getCurrentExpectedMessage() { return currentExpectedMessage; } public List getMessageProcessingTimesSinceLastExpectedMessage() { return messagesSinceLastExpectedMessage; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("\n> Last Expected Message = " + lastExpectedMessage); builder.append("\n> Current Expected Message = " + currentExpectedMessage); builder.append("\n> Expected time in between messages = " + expectedTimeInMillis); builder.append("\n> Actual time in between messages = " + actualTimeInMillis); for (MessageProcessingTime time : messagesSinceLastExpectedMessage) { builder.append("\n\t> ").append(time.toString()); } return builder.toString(); } } public interface Context { Context done(); Optional error(); } private static class NoOpContext implements Context { @Override public Context done() { return this; } @Override public Optional error() { return Optional.absent(); } } private class CurrentMessageContext implements Context { Stopwatch stopwatch = Stopwatch.createStarted(); boolean done = true; public void reset(){ Preconditions.checkState(done, String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage)); done = false; stopwatch.reset().start(); } @Override public Context done() { processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS)); done = true; return this; } @Override public Optional error() { return Optional.absent(); } } private class ErrorContext implements Context { Object message; private final Optional error; Stopwatch stopwatch; ErrorContext(Object message, Optional error){ this.message = message; this.error = error; this.stopwatch = Stopwatch.createStarted(); } @Override public Context done(){ processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS)); this.stopwatch.stop(); return this; } @Override public Optional error() { return error; } } }