BUG-5280: refactor AbstractClientConnection
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / AbstractClientConnection.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Iterator;
17 import java.util.Map.Entry;
18 import java.util.Optional;
19 import java.util.Queue;
20 import java.util.concurrent.TimeUnit;
21 import java.util.function.Consumer;
22 import javax.annotation.Nonnull;
23 import javax.annotation.concurrent.GuardedBy;
24 import javax.annotation.concurrent.NotThreadSafe;
25 import org.opendaylight.controller.cluster.access.concepts.Request;
26 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
27 import org.opendaylight.controller.cluster.access.concepts.RequestException;
28 import org.opendaylight.controller.cluster.access.concepts.Response;
29 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import scala.concurrent.duration.FiniteDuration;
33
34 /**
35  * Base class for a connection to the backend. Responsible to queueing and dispatch of requests toward the backend.
36  * Can be in three conceptual states: Connecting, Connected and Reconnecting, which are represented by public final
37  * classes exposed from this package.
38  *
39  * @author Robert Varga
40  */
41 @NotThreadSafe
42 public abstract class AbstractClientConnection<T extends BackendInfo> {
43     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class);
44
45     // Keep these constants in nanoseconds, as that prevents unnecessary conversions in the fast path
46     @VisibleForTesting
47     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
48     @VisibleForTesting
49     static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
50
51     private final Queue<TransmittedConnectionEntry> inflight;
52     private final Queue<ConnectionEntry> pending;
53
54     private final ClientActorContext context;
55     private final Long cookie;
56
57     private volatile ReconnectForwarder successor;
58     private volatile RequestException poisoned;
59     private long lastProgress;
60
61     private AbstractClientConnection(final ClientActorContext context, final Long cookie,
62             final Queue<TransmittedConnectionEntry> inflight, final Queue<ConnectionEntry> pending) {
63         this.context = Preconditions.checkNotNull(context);
64         this.cookie = Preconditions.checkNotNull(cookie);
65         this.inflight = Preconditions.checkNotNull(inflight);
66         this.pending = Preconditions.checkNotNull(pending);
67         this.lastProgress = readTime();
68     }
69
70     // Do not allow subclassing outside of this package
71     AbstractClientConnection(final ClientActorContext context, final Long cookie) {
72         this(context, cookie, new ArrayDeque<>(), new ArrayDeque<>(1));
73     }
74
75     // Do not allow subclassing outside of this package
76     AbstractClientConnection(final AbstractClientConnection<T> oldConnection) {
77         this(oldConnection.context, oldConnection.cookie, oldConnection.inflight, oldConnection.pending);
78     }
79
80     public final ClientActorContext context() {
81         return context;
82     }
83
84     public final @Nonnull Long cookie() {
85         return cookie;
86     }
87
88     public final ActorRef localActor() {
89         return context.self();
90     }
91
92     /**
93      * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
94      * from any thread.
95      *
96      * @param request Request to send
97      * @param callback Callback to invoke
98      */
99     public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
100         Preconditions.checkState(poisoned == null, "Connection %s has been poisoned", this);
101
102         final ReconnectForwarder beforeQueue = successor;
103         final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
104         if (beforeQueue != null) {
105             LOG.trace("Forwarding entry {} from {} to {}", entry, this, beforeQueue);
106             beforeQueue.forwardEntry(entry);
107             return;
108         }
109
110         enqueueEntry(entry);
111
112         final ReconnectForwarder afterQueue = successor;
113         if (afterQueue != null) {
114             synchronized (this) {
115                 spliceToSuccessor(afterQueue);
116             }
117         }
118     }
119
120     public final synchronized void setForwarder(final ReconnectForwarder forwarder) {
121         Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
122         successor = Preconditions.checkNotNull(forwarder);
123         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
124         spliceToSuccessor(forwarder);
125     }
126
127     public abstract Optional<T> getBackendInfo();
128
129     abstract ClientActorBehavior<T> reconnectConnection(ClientActorBehavior<T> current);
130
131     abstract int remoteMaxMessages();
132
133     abstract Entry<ActorRef, RequestEnvelope> prepareForTransmit(Request<?, ?> req);
134
135     @GuardedBy("this")
136     final void spliceToSuccessor(final ReconnectForwarder successor) {
137         ConnectionEntry entry = inflight.poll();
138         while (entry != null) {
139             successor.forwardEntry(entry);
140             entry = inflight.poll();
141         }
142
143         entry = pending.poll();
144         while (entry != null) {
145             successor.forwardEntry(entry);
146             entry = pending.poll();
147         }
148     }
149
150     private long readTime() {
151         return context.ticker().read();
152     }
153
154     private void transmit(final ConnectionEntry entry) {
155         final Entry<ActorRef, RequestEnvelope> tuple = prepareForTransmit(entry.getRequest());
156         final RequestEnvelope req = tuple.getValue();
157
158         // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread
159         // than the client actor thread, in which case the round-trip could be made faster than we can enqueue --
160         // in which case the receive routine would not find the entry.
161         final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, req.getSessionId(),
162             req.getTxSequence(), readTime());
163         inflight.add(txEntry);
164
165         final ActorRef actor = tuple.getKey();
166         LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), req, actor);
167         actor.tell(req, ActorRef.noSender());
168     }
169
170     final void enqueueEntry(final ConnectionEntry entry) {
171         if (inflight.size() < remoteMaxMessages()) {
172             transmit(entry);
173             LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this);
174         } else {
175             LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest());
176             pending.add(entry);
177         }
178     }
179
180     /**
181      * Schedule a timer to fire on the actor thread after a delay.
182      *
183      * @param delay Delay, in nanoseconds
184      */
185     private void scheduleTimer(final FiniteDuration delay) {
186         LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay);
187         context.executeInActor(this::runTimer, delay);
188     }
189
190     /**
191      * Check this queue for timeout and initiate reconnection if that happened. If the queue has not made progress
192      * in {@link #NO_PROGRESS_TIMEOUT_NANOS} nanoseconds, it will be aborted.
193      *
194      * @param current Current behavior
195      * @return Next behavior to use
196      */
197     @VisibleForTesting
198     final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
199         final long now = readTime();
200
201         if (!inflight.isEmpty() || !pending.isEmpty()) {
202             final long ticksSinceProgress = now - lastProgress;
203             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
204                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
205                     TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
206
207                 poison(new NoProgressException(ticksSinceProgress));
208                 current.removeConnection(this);
209                 return current;
210             }
211         }
212
213         // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
214         // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
215         // return convention.
216         final Optional<FiniteDuration> delay = checkTimeout(now);
217         if (delay == null) {
218             // We have timed out. There is no point in scheduling a timer
219             return reconnectConnection(current);
220         }
221
222         if (delay.isPresent()) {
223             // If there is new delay, schedule a timer
224             scheduleTimer(delay.get());
225         }
226
227         return current;
228     }
229
230     /*
231      * We are using tri-state return here to indicate one of three conditions:
232      * - if there is no timeout to schedule, return Optional.empty()
233      * - if there is a timeout to schedule, return a non-empty optional
234      * - if this connections has timed out, return null
235      */
236     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
237             justification = "Returning null Optional is documented in the API contract.")
238     private Optional<FiniteDuration> checkTimeout(final ConnectionEntry head, final long now) {
239         if (head == null) {
240             return Optional.empty();
241         }
242
243         final long delay = head.getEnqueuedTicks() - now + REQUEST_TIMEOUT_NANOS;
244         if (delay <= 0) {
245             LOG.debug("Connection {} timed out", this);
246             return null;
247         }
248
249         return Optional.of(FiniteDuration.apply(delay, TimeUnit.NANOSECONDS));
250     }
251
252     /*
253      * We are using tri-state return here to indicate one of three conditions:
254      * - if there is no timeout to schedule, return Optional.empty()
255      * - if there is a timeout to schedule, return a non-empty optional
256      * - if this connections has timed out, return null
257      */
258     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
259             justification = "Returning null Optional is documented in the API contract.")
260     @VisibleForTesting
261     final Optional<FiniteDuration> checkTimeout(final long now) {
262         final Optional<FiniteDuration> xmit = checkTimeout(inflight.peek(), now);
263         if (xmit == null) {
264             return null;
265         }
266         final Optional<FiniteDuration> pend = checkTimeout(pending.peek(), now);
267         if (pend == null) {
268             return null;
269         }
270         if (!xmit.isPresent()) {
271             return pend;
272         }
273         if (!pend.isPresent()) {
274             return xmit;
275         }
276
277         return Optional.of(xmit.get().min(pend.get()));
278     }
279
280     final void poison(final RequestException cause) {
281         poisoned = cause;
282
283         poisonQueue(inflight, cause);
284         poisonQueue(pending, cause);
285     }
286
287     @VisibleForTesting
288     final RequestException poisoned() {
289         return poisoned;
290     }
291
292     final void receiveResponse(final ResponseEnvelope<?> envelope) {
293         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
294         if (maybeEntry == null) {
295             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
296             maybeEntry = findMatchingEntry(pending, envelope);
297         }
298
299         if (maybeEntry == null || !maybeEntry.isPresent()) {
300             LOG.warn("No request matching {} found, ignoring response", envelope);
301             return;
302         }
303
304         final TransmittedConnectionEntry entry = maybeEntry.get();
305         LOG.debug("Completing {} with {}", entry, envelope);
306         entry.complete(envelope.getMessage());
307
308         // We have freed up a slot, try to transmit something
309         int toSend = remoteMaxMessages() - inflight.size();
310         while (toSend > 0) {
311             final ConnectionEntry e = pending.poll();
312             if (e == null) {
313                 break;
314             }
315
316             LOG.debug("Transmitting entry {}", e);
317             transmit(e);
318             toSend--;
319         }
320
321         lastProgress = readTime();
322     }
323
324     private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
325         for (ConnectionEntry e : queue) {
326             final Request<?, ?> request = e.getRequest();
327             LOG.trace("Poisoning request {}", request, cause);
328             e.complete(request.toRequestFailure(cause));
329         }
330         queue.clear();
331     }
332
333     /*
334      * We are using tri-state return here to indicate one of three conditions:
335      * - if a matching entry is found, return an Optional containing it
336      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
337      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
338      */
339     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
340             justification = "Returning null Optional is documented in the API contract.")
341     private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
342             final ResponseEnvelope<?> envelope) {
343         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
344         // to use an iterator
345         final Iterator<? extends ConnectionEntry> it = queue.iterator();
346         while (it.hasNext()) {
347             final ConnectionEntry e = it.next();
348             final Request<?, ?> request = e.getRequest();
349             final Response<?, ?> response = envelope.getMessage();
350
351             // First check for matching target, or move to next entry
352             if (!request.getTarget().equals(response.getTarget())) {
353                 continue;
354             }
355
356             // Sanity-check logical sequence, ignore any out-of-order messages
357             if (request.getSequence() != response.getSequence()) {
358                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
359                 return Optional.empty();
360             }
361
362             // Check if the entry has (ever) been transmitted
363             if (!(e instanceof TransmittedConnectionEntry)) {
364                 return Optional.empty();
365             }
366
367             final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
368
369             // Now check session match
370             if (envelope.getSessionId() != te.getSessionId()) {
371                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
372                 return Optional.empty();
373             }
374             if (envelope.getTxSequence() != te.getTxSequence()) {
375                 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
376                 return Optional.empty();
377             }
378
379             LOG.debug("Completing request {} with {}", request, envelope);
380             it.remove();
381             return Optional.of(te);
382         }
383
384         return null;
385     }
386 }