BUG-8422: Propagate enqueue time
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / AbstractClientConnection.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import com.google.common.base.Preconditions;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.Optional;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.Consumer;
21 import javax.annotation.Nonnull;
22 import javax.annotation.concurrent.GuardedBy;
23 import javax.annotation.concurrent.NotThreadSafe;
24 import org.opendaylight.controller.cluster.access.concepts.Request;
25 import org.opendaylight.controller.cluster.access.concepts.RequestException;
26 import org.opendaylight.controller.cluster.access.concepts.Response;
27 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.duration.FiniteDuration;
31
32 /**
33  * Base class for a connection to the backend. Responsible to queueing and dispatch of requests toward the backend.
34  * Can be in three conceptual states: Connecting, Connected and Reconnecting, which are represented by public final
35  * classes exposed from this package.
36  *
37  * @author Robert Varga
38  */
39 @NotThreadSafe
40 public abstract class AbstractClientConnection<T extends BackendInfo> {
41     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class);
42
43     // Keep these constants in nanoseconds, as that prevents unnecessary conversions in the fast path
44     @VisibleForTesting
45     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
46     @VisibleForTesting
47     static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
48
49     private static final FiniteDuration REQUEST_TIMEOUT_DURATION = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
50         TimeUnit.NANOSECONDS);
51
52     private final Lock lock = new ReentrantLock();
53     private final ClientActorContext context;
54     @GuardedBy("lock")
55     private final TransmitQueue queue;
56     private final Long cookie;
57
58     @GuardedBy("lock")
59     private boolean haveTimer;
60
61     private volatile RequestException poisoned;
62
63     // Do not allow subclassing outside of this package
64     AbstractClientConnection(final ClientActorContext context, final Long cookie,
65             final TransmitQueue queue) {
66         this.context = Preconditions.checkNotNull(context);
67         this.cookie = Preconditions.checkNotNull(cookie);
68         this.queue = Preconditions.checkNotNull(queue);
69     }
70
71     // Do not allow subclassing outside of this package
72     AbstractClientConnection(final AbstractClientConnection<T> oldConnection, final int targetQueueSize) {
73         this.context = oldConnection.context;
74         this.cookie = oldConnection.cookie;
75         this.queue = new TransmitQueue.Halted(targetQueueSize);
76     }
77
78     public final ClientActorContext context() {
79         return context;
80     }
81
82     public final @Nonnull Long cookie() {
83         return cookie;
84     }
85
86     public final ActorRef localActor() {
87         return context.self();
88     }
89
90     public final long currentTime() {
91         return context.ticker().read();
92     }
93
94     /**
95      * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
96      * from any thread.
97      *
98      * <p>This method may put the caller thread to sleep in order to throttle the request rate.
99      * The callback may be called before the sleep finishes.
100      *
101      * @param request Request to send
102      * @param callback Callback to invoke
103      */
104     public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
105         final long now = currentTime();
106         final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
107         try {
108             TimeUnit.NANOSECONDS.sleep(delay);
109         } catch (InterruptedException e) {
110             Thread.currentThread().interrupt();
111             LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
112         }
113     }
114
115     /**
116      * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
117      * from any thread.
118      *
119      * <p>
120      * Note that unlike {@link #sendRequest(Request, Consumer)}, this method does not exert backpressure, hence it
121      * should never be called from an application thread.
122      *
123      * @param request Request to send
124      * @param callback Callback to invoke
125      * @param enqueuedTicks Time (according to {@link #currentTime()} of request enqueue
126      */
127     public final void enqueueRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
128             final long enqueuedTicks) {
129         enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
130     }
131
132     public abstract Optional<T> getBackendInfo();
133
134     final Iterable<ConnectionEntry> startReplay() {
135         lock.lock();
136         return queue.asIterable();
137     }
138
139     @GuardedBy("lock")
140     final void finishReplay(final ReconnectForwarder forwarder) {
141         setForwarder(forwarder);
142         lock.unlock();
143     }
144
145     @GuardedBy("lock")
146     final void setForwarder(final ReconnectForwarder forwarder) {
147         queue.setForwarder(forwarder, currentTime());
148     }
149
150     @GuardedBy("lock")
151     abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
152
153     final long enqueueEntry(final ConnectionEntry entry, final long now) {
154         lock.lock();
155         try {
156             final RequestException maybePoison = poisoned;
157             if (maybePoison != null) {
158                 throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
159             }
160
161             if (queue.isEmpty()) {
162                 // The queue is becoming non-empty, schedule a timer
163                 scheduleTimer(REQUEST_TIMEOUT_DURATION);
164             }
165             return queue.enqueue(entry, now);
166         } finally {
167             lock.unlock();
168         }
169     }
170
171     final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
172         lock.lock();
173         try {
174             return lockedReconnect(current);
175         } finally {
176             lock.unlock();
177         }
178     }
179
180     /**
181      * Schedule a timer to fire on the actor thread after a delay.
182      *
183      * @param delay Delay, in nanoseconds
184      */
185     @GuardedBy("lock")
186     private void scheduleTimer(final FiniteDuration delay) {
187         if (haveTimer) {
188             LOG.debug("{}: timer already scheduled", context.persistenceId());
189             return;
190         }
191         if (queue.hasSuccessor()) {
192             LOG.debug("{}: connection has successor, not scheduling timer", context.persistenceId());
193             return;
194         }
195         LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay);
196         context.executeInActor(this::runTimer, delay);
197         haveTimer = true;
198     }
199
200     /**
201      * Check this queue for timeout and initiate reconnection if that happened. If the queue has not made progress
202      * in {@link #NO_PROGRESS_TIMEOUT_NANOS} nanoseconds, it will be aborted.
203      *
204      * @param current Current behavior
205      * @return Next behavior to use
206      */
207     @VisibleForTesting
208     final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
209         final Optional<FiniteDuration> delay;
210
211         lock.lock();
212         try {
213             haveTimer = false;
214             final long now = currentTime();
215             // The following line is only reliable when queue is not forwarding, but such state should not last long.
216             final long ticksSinceProgress = queue.ticksStalling(now);
217             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
218                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
219                     TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
220
221                 lockedPoison(new NoProgressException(ticksSinceProgress));
222                 current.removeConnection(this);
223                 return current;
224             }
225
226             // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
227             // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
228             // return convention.
229             delay = lockedCheckTimeout(now);
230             if (delay == null) {
231                 // We have timed out. There is no point in scheduling a timer
232                 return lockedReconnect(current);
233             }
234
235             if (delay.isPresent()) {
236                 // If there is new delay, schedule a timer
237                 scheduleTimer(delay.get());
238             }
239         } finally {
240             lock.unlock();
241         }
242
243         return current;
244     }
245
246     @VisibleForTesting
247     final Optional<FiniteDuration> checkTimeout(final long now) {
248         lock.lock();
249         try {
250             return lockedCheckTimeout(now);
251         } finally {
252             lock.unlock();
253         }
254     }
255
256     /*
257      * We are using tri-state return here to indicate one of three conditions:
258      * - if there is no timeout to schedule, return Optional.empty()
259      * - if there is a timeout to schedule, return a non-empty optional
260      * - if this connections has timed out, return null
261      */
262     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
263             justification = "Returning null Optional is documented in the API contract.")
264     @GuardedBy("lock")
265     private Optional<FiniteDuration> lockedCheckTimeout(final long now) {
266         final ConnectionEntry head = queue.peek();
267         if (head == null) {
268             return Optional.empty();
269         }
270
271         final long beenOpen = now - head.getEnqueuedTicks();
272         if (beenOpen >= REQUEST_TIMEOUT_NANOS) {
273             LOG.debug("Connection {} has a request not completed for {} nanoseconds, timing out", this, beenOpen);
274             return null;
275         }
276
277         return Optional.of(FiniteDuration.apply(REQUEST_TIMEOUT_NANOS - beenOpen, TimeUnit.NANOSECONDS));
278     }
279
280     final void poison(final RequestException cause) {
281         lock.lock();
282         try {
283             lockedPoison(cause);
284         } finally {
285             lock.unlock();
286         }
287     }
288
289     @GuardedBy("lock")
290     private void lockedPoison(final RequestException cause) {
291         poisoned = cause;
292         queue.poison(cause);
293     }
294
295     @VisibleForTesting
296     final RequestException poisoned() {
297         return poisoned;
298     }
299
300     final void receiveResponse(final ResponseEnvelope<?> envelope) {
301         final long now = currentTime();
302
303         final Optional<TransmittedConnectionEntry> maybeEntry;
304         lock.lock();
305         try {
306             maybeEntry = queue.complete(envelope, now);
307         } finally {
308             lock.unlock();
309         }
310
311         if (maybeEntry.isPresent()) {
312             final TransmittedConnectionEntry entry = maybeEntry.get();
313             LOG.debug("Completing {} with {}", entry, envelope);
314             entry.complete(envelope.getMessage());
315         }
316     }
317
318     @Override
319     public final String toString() {
320         return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues()).toString();
321     }
322
323     ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
324         return toStringHelper.add("client", context.getIdentifier()).add("cookie", cookie).add("poisoned", poisoned);
325     }
326 }