BUG 2718 : Create a diagnostic utility to track append entries replies
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / MessageTracker.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Stopwatch;
14 import com.google.common.collect.ImmutableList;
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.concurrent.TimeUnit;
18
19 /**
20  * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
21  * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
22  * received between the arrival of two instances of the same message and the amount of time it took to process each
23  * of those messages.
24  * <br/>
25  * Usage of the API is as follows,
26  * <pre>
27  *
28  *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
29  *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
30  *
31  *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
32  *     // will do nothing
33  *     tracker.begin();
34  *
35  *     .....
36  *
37  *     MessageTracker.Context context = tracker.received(message);
38  *
39  *     if(context.error().isPresent()){
40  *         LOG.error("{}", context.error().get());
41  *     }
42  *
43  *     // Some custom processing
44  *     process(message);
45  *
46  *     context.done();
47  *
48  * </pre>
49  */
50 public class MessageTracker {
51
52     private static final Context NO_OP_CONTEXT = new NoOpContext();
53
54     private final Class expectedMessageClass;
55
56     private final long expectedArrivalInterval;
57
58     private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
59
60     private Stopwatch expectedMessageWatch;
61
62     private boolean enabled = false;
63
64     private Object lastExpectedMessage;
65
66     private Object currentMessage;
67
68     private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
69
70     /**
71      *
72      * @param expectedMessageClass The class of the message to track
73      * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
74      *                                        message
75      */
76     public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){
77         this.expectedMessageClass = expectedMessageClass;
78         this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
79     }
80
81     public void begin(){
82         if(enabled) {
83             return;
84         }
85         enabled = true;
86         expectedMessageWatch = Stopwatch.createStarted();
87     }
88
89     public Context received(Object message){
90         if(!enabled) {
91             return NO_OP_CONTEXT;
92         }
93         this.currentMessage = message;
94         if(expectedMessageClass.isInstance(message)){
95             long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
96             if(actualElapsedTime > expectedArrivalInterval){
97                 return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
98                         ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
99                         actualElapsedTime)));
100             }
101             this.lastExpectedMessage = message;
102             this.messagesSinceLastExpectedMessage.clear();
103         }
104
105         currentMessageContext.reset();
106         return currentMessageContext;
107     }
108
109     private void processed(Object message, long messageElapseTimeInNanos){
110         if(!enabled) {
111             return;
112         }
113         if(!expectedMessageClass.isInstance(message)){
114             this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
115         }
116     }
117
118     public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
119         return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
120     }
121
122     public static class MessageProcessingTime {
123         private final Class messageClass;
124         private final long elapsedTimeInNanos;
125
126         MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){
127             this.messageClass = messageClass;
128             this.elapsedTimeInNanos = elapsedTimeInNanos;
129         }
130
131         @Override
132         public String toString() {
133             return "MessageProcessingTime{" +
134                     "messageClass=" + messageClass.getSimpleName() +
135                     ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
136                     '}';
137         }
138
139         public Class getMessageClass() {
140             return messageClass;
141         }
142
143         public long getElapsedTimeInNanos() {
144             return elapsedTimeInNanos;
145         }
146     }
147
148     public interface Error {
149         Object getLastExpectedMessage();
150         Object getCurrentExpectedMessage();
151         List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
152     }
153
154     private class FailedExpectation implements Error {
155
156         private final Object lastExpectedMessage;
157         private final Object currentExpectedMessage;
158         private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
159         private final long expectedTimeInMillis;
160         private final long actualTimeInMillis;
161
162         public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
163             this.lastExpectedMessage = lastExpectedMessage;
164             this.currentExpectedMessage = message;
165             this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
166             this.expectedTimeInMillis = expectedTimeInMillis;
167             this.actualTimeInMillis = actualTimeInMillis;
168         }
169
170         public Object getLastExpectedMessage() {
171             return lastExpectedMessage;
172         }
173
174         public Object getCurrentExpectedMessage() {
175             return currentExpectedMessage;
176         }
177
178         public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
179             return messagesSinceLastExpectedMessage;
180         }
181
182         @Override
183         public String toString() {
184             StringBuilder builder = new StringBuilder();
185             builder.append("\n> Last Expected Message = " + lastExpectedMessage);
186             builder.append("\n> Current Expected Message = " + currentExpectedMessage);
187             builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
188             builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
189             for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
190                 builder.append("\n\t> ").append(time.toString());
191             }
192             return builder.toString();
193         }
194
195     }
196
197     public interface Context {
198         Context done();
199         Optional<? extends Error> error();
200     }
201
202     private static class NoOpContext implements Context {
203
204         @Override
205         public Context done() {
206             return this;
207         }
208
209         @Override
210         public Optional<Error> error() {
211             return Optional.absent();
212         }
213     }
214
215     private class CurrentMessageContext implements Context {
216         Stopwatch stopwatch = Stopwatch.createStarted();
217         boolean done = true;
218
219         public void reset(){
220             Preconditions.checkState(done);
221             done = false;
222             stopwatch.reset().start();
223         }
224
225         @Override
226         public Context done() {
227             processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
228             done = true;
229             return this;
230         }
231
232         @Override
233         public Optional<? extends Error> error() {
234             return Optional.absent();
235         }
236     }
237
238     private class ErrorContext implements Context {
239         Object message;
240         private final Optional<? extends Error> error;
241         Stopwatch stopwatch;
242
243         ErrorContext(Object message, Optional<? extends Error> error){
244             this.message = message;
245             this.error = error;
246             this.stopwatch = Stopwatch.createStarted();
247         }
248
249         @Override
250         public Context done(){
251             processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
252             this.stopwatch.stop();
253             return this;
254         }
255
256         @Override
257         public Optional<? extends Error> error() {
258             return error;
259         }
260     }
261 }