2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.common.actor;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
13 import static java.util.concurrent.TimeUnit.MILLISECONDS;
14 import static java.util.concurrent.TimeUnit.NANOSECONDS;
16 import com.google.common.annotations.Beta;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.base.Stopwatch;
19 import com.google.common.base.Ticker;
20 import com.google.common.collect.ImmutableList;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Optional;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
29 * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
30 * received between the arrival of two instances of the same message and the amount of time it took to process each
33 * Usage of the API is as follows,
36 * // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
37 * MessageTracker tracker = new MessageTracker(Foo.class, 10);
39 * // Begin the tracking process. If this is not called then calling received and done on the resultant Context
45 * try (MessageTracker.Context context = tracker.received(message)) {
47 * if (context.error().isPresent()){
48 * LOG.error("{}", context.error().get());
51 * // Some custom processing
58 * This class is NOT thread-safe.
61 public final class MessageTracker {
62 public abstract static class Context implements AutoCloseable {
64 // Hidden to prevent outside instantiation
67 public abstract Optional<Error> error();
70 public abstract void close();
73 public interface Error {
74 Object getLastExpectedMessage();
76 Object getCurrentExpectedMessage();
78 List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
82 public static final class MessageProcessingTime {
83 private final Class<?> messageClass;
84 private final long elapsedTimeInNanos;
86 MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
87 this.messageClass = requireNonNull(messageClass);
88 this.elapsedTimeInNanos = elapsedTimeInNanos;
92 public String toString() {
93 return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis="
94 + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]";
98 public Class<?> getMessageClass() {
102 public long getElapsedTimeInNanos() {
103 return elapsedTimeInNanos;
107 private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
108 private static final Context NO_OP_CONTEXT = new Context() {
110 public void close() {
115 public Optional<Error> error() {
116 return Optional.empty();
120 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
122 private final CurrentMessageContext currentMessageContext;
124 private final Stopwatch expectedMessageWatch;
126 private final Class<?> expectedMessageClass;
128 private final long expectedArrivalInterval;
130 private final Ticker ticker;
132 private Object lastExpectedMessage;
135 MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
136 final Ticker ticker) {
137 checkArgument(expectedArrivalIntervalInMillis >= 0);
138 this.expectedMessageClass = requireNonNull(expectedMessageClass);
139 this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
140 this.ticker = requireNonNull(ticker);
141 this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
142 this.currentMessageContext = new CurrentMessageContext();
146 * Constructs an instance.
148 * @param expectedMessageClass the class of the message to track
149 * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected
152 public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
153 this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
156 public void begin() {
157 if (!expectedMessageWatch.isRunning()) {
158 LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
159 expectedMessageWatch.start();
163 public Context received(final Object message) {
164 if (!expectedMessageWatch.isRunning()) {
165 return NO_OP_CONTEXT;
168 if (expectedMessageClass.isInstance(message)) {
169 final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
170 if (actualElapsedTime > expectedArrivalInterval) {
171 return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
172 messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
174 lastExpectedMessage = message;
175 messagesSinceLastExpectedMessage.clear();
176 expectedMessageWatch.reset().start();
179 currentMessageContext.reset(message);
180 return currentMessageContext;
183 void processed(final Object message, final long messageElapseTimeInNanos) {
184 if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
185 messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
186 messageElapseTimeInNanos));
190 public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
191 return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
194 private static final class FailedExpectation implements Error {
195 private final Object lastExpectedMessage;
196 private final Object currentExpectedMessage;
197 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
198 private final long expectedTimeInMillis;
199 private final long actualTimeInMillis;
201 FailedExpectation(final Object lastExpectedMessage, final Object message,
202 final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
203 final long actualTimeNanos) {
204 this.lastExpectedMessage = lastExpectedMessage;
205 this.currentExpectedMessage = message;
206 this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
207 this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
208 this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
212 public Object getLastExpectedMessage() {
213 return lastExpectedMessage;
217 public Object getCurrentExpectedMessage() {
218 return currentExpectedMessage;
222 public List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage() {
223 return messagesSinceLastExpectedMessage;
227 public String toString() {
228 StringBuilder builder = new StringBuilder()
229 .append("\n> Last Expected Message = ").append(lastExpectedMessage)
230 .append("\n> Current Expected Message = ").append(currentExpectedMessage)
231 .append("\n> Expected time in between messages = ").append(expectedTimeInMillis)
232 .append("\n> Actual time in between messages = ").append(actualTimeInMillis);
233 for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
234 builder.append("\n\t> ").append(time);
236 return builder.toString();
240 private abstract class AbstractTimedContext extends Context {
241 abstract Object message();
243 abstract Stopwatch stopTimer();
246 public final void close() {
247 processed(message(), stopTimer().elapsed(NANOSECONDS));
251 private final class CurrentMessageContext extends AbstractTimedContext {
252 private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
253 private Object message;
255 void reset(final Object newMessage) {
256 this.message = requireNonNull(newMessage);
257 checkState(!stopwatch.isRunning(), "Trying to reset a context that is not done (%s). currentMessage = %s",
268 Stopwatch stopTimer() {
269 return stopwatch.stop();
273 public Optional<Error> error() {
274 return Optional.empty();
278 private final class ErrorContext extends AbstractTimedContext {
279 private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
280 private final Object message;
281 private final Error error;
283 ErrorContext(final Object message, final Error error) {
284 this.message = requireNonNull(message);
285 this.error = requireNonNull(error);
294 Stopwatch stopTimer() {
295 return stopwatch.stop();
299 public Optional<Error> error() {
300 return Optional.of(error);