2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.common.actor;
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 import static java.util.concurrent.TimeUnit.NANOSECONDS;
14 import com.google.common.annotations.Beta;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import com.google.common.base.Stopwatch;
19 import com.google.common.base.Ticker;
20 import com.google.common.collect.ImmutableList;
21 import java.util.LinkedList;
22 import java.util.List;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
28 * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
29 * received between the arrival of two instances of the same message and the amount of time it took to process each
32 * Usage of the API is as follows,
35 * // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
36 * MessageTracker tracker = new MessageTracker(Foo.class, 10);
38 * // Begin the tracking process. If this is not called then calling received and done on the resultant Context
44 * try (MessageTracker.Context context = tracker.received(message)) {
46 * if (context.error().isPresent()){
47 * LOG.error("{}", context.error().get());
50 * // Some custom processing
57 * This class is NOT thread-safe.
60 public final class MessageTracker {
61 public abstract static class Context implements AutoCloseable {
63 // Hidden to prevent outside instantiation
66 public abstract Optional<Error> error();
69 public abstract void close();
72 public interface Error {
73 Object getLastExpectedMessage();
75 Object getCurrentExpectedMessage();
77 List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
81 public static final class MessageProcessingTime {
82 private final Class<?> messageClass;
83 private final long elapsedTimeInNanos;
85 MessageProcessingTime(final Class<?> messageClass, final long elapsedTimeInNanos) {
86 this.messageClass = Preconditions.checkNotNull(messageClass);
87 this.elapsedTimeInNanos = elapsedTimeInNanos;
91 public String toString() {
92 return "MessageProcessingTime [messageClass=" + messageClass + ", elapsedTimeInMillis="
93 + NANOSECONDS.toMillis(elapsedTimeInNanos) + "]";
97 public Class<?> getMessageClass() {
101 public long getElapsedTimeInNanos() {
102 return elapsedTimeInNanos;
106 private static final Logger LOG = LoggerFactory.getLogger(MessageTracker.class);
107 private static final Context NO_OP_CONTEXT = new Context() {
109 public void close() {
114 public Optional<Error> error() {
115 return Optional.absent();
119 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
121 private final CurrentMessageContext currentMessageContext;
123 private final Stopwatch expectedMessageWatch;
125 private final Class<?> expectedMessageClass;
127 private final long expectedArrivalInterval;
129 private final Ticker ticker;
131 private Object lastExpectedMessage;
134 MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis,
135 final Ticker ticker) {
136 Preconditions.checkArgument(expectedArrivalIntervalInMillis >= 0);
137 this.expectedMessageClass = Preconditions.checkNotNull(expectedMessageClass);
138 this.expectedArrivalInterval = MILLISECONDS.toNanos(expectedArrivalIntervalInMillis);
139 this.ticker = Preconditions.checkNotNull(ticker);
140 this.expectedMessageWatch = Stopwatch.createUnstarted(ticker);
141 this.currentMessageContext = new CurrentMessageContext();
145 * Constructs an instance.
147 * @param expectedMessageClass the class of the message to track
148 * @param expectedArrivalIntervalInMillis the expected arrival interval between two instances of the expected
151 public MessageTracker(final Class<?> expectedMessageClass, final long expectedArrivalIntervalInMillis) {
152 this(expectedMessageClass, expectedArrivalIntervalInMillis, Ticker.systemTicker());
155 public void begin() {
156 if (!expectedMessageWatch.isRunning()) {
157 LOG.trace("Started tracking class {} timeout {}ns", expectedMessageClass, expectedArrivalInterval);
158 expectedMessageWatch.start();
162 public Context received(final Object message) {
163 if (!expectedMessageWatch.isRunning()) {
164 return NO_OP_CONTEXT;
167 if (expectedMessageClass.isInstance(message)) {
168 final long actualElapsedTime = expectedMessageWatch.elapsed(NANOSECONDS);
169 if (actualElapsedTime > expectedArrivalInterval) {
170 return new ErrorContext(message, new FailedExpectation(lastExpectedMessage, message,
171 messagesSinceLastExpectedMessage, expectedArrivalInterval, actualElapsedTime));
173 lastExpectedMessage = message;
174 messagesSinceLastExpectedMessage.clear();
175 expectedMessageWatch.reset().start();
178 currentMessageContext.reset(message);
179 return currentMessageContext;
182 void processed(final Object message, final long messageElapseTimeInNanos) {
183 if (expectedMessageWatch.isRunning() && !expectedMessageClass.isInstance(message)) {
184 messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(),
185 messageElapseTimeInNanos));
189 public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage() {
190 return ImmutableList.copyOf(messagesSinceLastExpectedMessage);
193 private static final class FailedExpectation implements Error {
194 private final Object lastExpectedMessage;
195 private final Object currentExpectedMessage;
196 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
197 private final long expectedTimeInMillis;
198 private final long actualTimeInMillis;
200 FailedExpectation(final Object lastExpectedMessage, final Object message,
201 final List<MessageProcessingTime> messagesSinceLastExpectedMessage, final long expectedTimeNanos,
202 final long actualTimeNanos) {
203 this.lastExpectedMessage = lastExpectedMessage;
204 this.currentExpectedMessage = message;
205 this.messagesSinceLastExpectedMessage = ImmutableList.copyOf(messagesSinceLastExpectedMessage);
206 this.expectedTimeInMillis = NANOSECONDS.toMillis(expectedTimeNanos);
207 this.actualTimeInMillis = NANOSECONDS.toMillis(actualTimeNanos);
211 public Object getLastExpectedMessage() {
212 return lastExpectedMessage;
216 public Object getCurrentExpectedMessage() {
217 return currentExpectedMessage;
221 public List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage() {
222 return messagesSinceLastExpectedMessage;
226 public String toString() {
227 StringBuilder builder = new StringBuilder();
228 builder.append("\n> Last Expected Message = ").append(lastExpectedMessage);
229 builder.append("\n> Current Expected Message = ").append(currentExpectedMessage);
230 builder.append("\n> Expected time in between messages = ").append(expectedTimeInMillis);
231 builder.append("\n> Actual time in between messages = ").append(actualTimeInMillis);
232 for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
233 builder.append("\n\t> ").append(time);
235 return builder.toString();
239 private abstract class AbstractTimedContext extends Context {
240 abstract Object message();
242 abstract Stopwatch stopTimer();
245 public final void close() {
246 processed(message(), stopTimer().elapsed(NANOSECONDS));
250 private final class CurrentMessageContext extends AbstractTimedContext {
251 private final Stopwatch stopwatch = Stopwatch.createUnstarted(ticker);
252 private Object message;
254 void reset(final Object newMessage) {
255 this.message = Preconditions.checkNotNull(newMessage);
256 Preconditions.checkState(!stopwatch.isRunning(),
257 "Trying to reset a context that is not done (%s). currentMessage = %s", this, newMessage);
267 Stopwatch stopTimer() {
268 return stopwatch.stop();
272 public Optional<Error> error() {
273 return Optional.absent();
277 private final class ErrorContext extends AbstractTimedContext {
278 private final Stopwatch stopwatch = Stopwatch.createStarted(ticker);
279 private final Object message;
280 private final Error error;
282 ErrorContext(final Object message, final Error error) {
283 this.message = Preconditions.checkNotNull(message);
284 this.error = Preconditions.checkNotNull(error);
293 Stopwatch stopTimer() {
294 return stopwatch.stop();
298 public Optional<Error> error() {
299 return Optional.of(error);