2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Stopwatch;
14 import com.google.common.collect.ImmutableList;
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.concurrent.TimeUnit;
20 * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
21 * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
22 * received between the arrival of two instances of the same message and the amount of time it took to process each
25 * Usage of the API is as follows,
28 * // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
29 * MessageTracker tracker = new MessageTracker(Foo.class, 10);
31 * // Begin the tracking process. If this is not called then calling received and done on the resultant Context
37 * MessageTracker.Context context = tracker.received(message);
39 * if(context.error().isPresent()){
40 * LOG.error("{}", context.error().get());
43 * // Some custom processing
50 public class MessageTracker {
52 private static final Context NO_OP_CONTEXT = new NoOpContext();
54 private final Class<?> expectedMessageClass;
56 private final long expectedArrivalInterval;
58 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
60 private Stopwatch expectedMessageWatch;
62 private boolean enabled = false;
64 private Object lastExpectedMessage;
66 private Object currentMessage;
68 private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
72 * @param expectedMessageClass The class of the message to track
73 * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
76 public MessageTracker(Class<?> expectedMessageClass, long expectedArrivalIntervalInMillis){
77 this.expectedMessageClass = expectedMessageClass;
78 this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
86 expectedMessageWatch = Stopwatch.createStarted();
89 public Context received(Object message){
93 this.currentMessage = message;
94 if(expectedMessageClass.isInstance(message)){
95 long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
96 if(actualElapsedTime > expectedArrivalInterval){
97 return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
98 ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
101 this.lastExpectedMessage = message;
102 this.messagesSinceLastExpectedMessage.clear();
105 currentMessageContext.reset();
106 return currentMessageContext;
109 private void processed(Object message, long messageElapseTimeInNanos){
113 if(!expectedMessageClass.isInstance(message)){
114 this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
118 public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
119 return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
122 public static class MessageProcessingTime {
123 private final Class<?> messageClass;
124 private final long elapsedTimeInNanos;
126 MessageProcessingTime(Class<?> messageClass, long elapsedTimeInNanos){
127 this.messageClass = messageClass;
128 this.elapsedTimeInNanos = elapsedTimeInNanos;
132 public String toString() {
133 return "MessageProcessingTime{" +
134 "messageClass=" + messageClass.getSimpleName() +
135 ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
139 public Class<?> getMessageClass() {
143 public long getElapsedTimeInNanos() {
144 return elapsedTimeInNanos;
148 public interface Error {
149 Object getLastExpectedMessage();
150 Object getCurrentExpectedMessage();
151 List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
154 private class FailedExpectation implements Error {
156 private final Object lastExpectedMessage;
157 private final Object currentExpectedMessage;
158 private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
159 private final long expectedTimeInMillis;
160 private final long actualTimeInMillis;
162 public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
163 this.lastExpectedMessage = lastExpectedMessage;
164 this.currentExpectedMessage = message;
165 this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
166 this.expectedTimeInMillis = expectedTimeInMillis;
167 this.actualTimeInMillis = actualTimeInMillis;
170 public Object getLastExpectedMessage() {
171 return lastExpectedMessage;
174 public Object getCurrentExpectedMessage() {
175 return currentExpectedMessage;
178 public List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage() {
179 return messagesSinceLastExpectedMessage;
183 public String toString() {
184 StringBuilder builder = new StringBuilder();
185 builder.append("\n> Last Expected Message = " + lastExpectedMessage);
186 builder.append("\n> Current Expected Message = " + currentExpectedMessage);
187 builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
188 builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
189 for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
190 builder.append("\n\t> ").append(time.toString());
192 return builder.toString();
197 public interface Context {
199 Optional<? extends Error> error();
202 private static class NoOpContext implements Context {
205 public Context done() {
210 public Optional<Error> error() {
211 return Optional.absent();
215 private class CurrentMessageContext implements Context {
216 Stopwatch stopwatch = Stopwatch.createStarted();
220 Preconditions.checkState(done,
221 String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage));
223 stopwatch.reset().start();
227 public Context done() {
228 processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
234 public Optional<? extends Error> error() {
235 return Optional.absent();
239 private class ErrorContext implements Context {
241 private final Optional<? extends Error> error;
244 ErrorContext(Object message, Optional<? extends Error> error){
245 this.message = message;
247 this.stopwatch = Stopwatch.createStarted();
251 public Context done(){
252 processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
253 this.stopwatch.stop();
258 public Optional<? extends Error> error() {