BUG-5280: add AbstractClientConnection
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / AbstractClientConnection.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Optional;
17 import java.util.Queue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.function.Consumer;
20 import javax.annotation.Nonnull;
21 import javax.annotation.concurrent.GuardedBy;
22 import javax.annotation.concurrent.NotThreadSafe;
23 import org.opendaylight.controller.cluster.access.concepts.Request;
24 import org.opendaylight.controller.cluster.access.concepts.RequestException;
25 import org.opendaylight.controller.cluster.access.concepts.Response;
26 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.duration.FiniteDuration;
30
31 /**
32  * Base class for a connection to the backend. Responsible to queueing and dispatch of requests toward the backend.
33  * Can be in three conceptual states: Connecting, Connected and Reconnecting, which are represented by public final
34  * classes exposed from this package.
35  *
36  * @author Robert Varga
37  */
38 @NotThreadSafe
39 public abstract class AbstractClientConnection<T extends BackendInfo> {
40     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class);
41
42     // Keep these constants in nanoseconds, as that prevents unnecessary conversions in the fast path
43     @VisibleForTesting
44     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
45     @VisibleForTesting
46     static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
47
48     private final Queue<ConnectionEntry> pending;
49     private final ClientActorContext context;
50     private final Long cookie;
51
52     private volatile ReconnectForwarder successor;
53     private volatile RequestException poisoned;
54     private long lastProgress;
55
56     private AbstractClientConnection(final ClientActorContext context, final Long cookie,
57             final Queue<ConnectionEntry> pending) {
58         this.context = Preconditions.checkNotNull(context);
59         this.cookie = Preconditions.checkNotNull(cookie);
60         this.pending = Preconditions.checkNotNull(pending);
61         this.lastProgress = readTime();
62     }
63
64     // Do not allow subclassing outside of this package
65     AbstractClientConnection(final ClientActorContext context, final Long cookie) {
66         this(context, cookie, new ArrayDeque<>(1));
67     }
68
69     // Do not allow subclassing outside of this package
70     AbstractClientConnection(final AbstractClientConnection<T> oldConnection) {
71         this(oldConnection.context, oldConnection.cookie, oldConnection.pending);
72     }
73
74     public final ClientActorContext context() {
75         return context;
76     }
77
78     public final @Nonnull Long cookie() {
79         return cookie;
80     }
81
82     public final ActorRef localActor() {
83         return context.self();
84     }
85
86     final long readTime() {
87         return context.ticker().read();
88     }
89
90     final Queue<ConnectionEntry> pending() {
91         return pending;
92     }
93
94     /**
95      * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
96      * from any thread.
97      *
98      * @param request Request to send
99      * @param callback Callback to invoke
100      */
101     public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
102         Preconditions.checkState(poisoned == null, "Connection %s has been poisoned", this);
103
104         final ReconnectForwarder beforeQueue = successor;
105         final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
106         if (beforeQueue != null) {
107             LOG.trace("Forwarding entry {} from {} to {}", entry, this, beforeQueue);
108             beforeQueue.forwardEntry(entry);
109             return;
110         }
111
112         enqueueEntry(entry);
113
114         final ReconnectForwarder afterQueue = successor;
115         if (afterQueue != null) {
116             synchronized (this) {
117                 spliceToSuccessor(afterQueue);
118             }
119         }
120     }
121
122     public final synchronized void setForwarder(final ReconnectForwarder forwarder) {
123         Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
124         successor = Preconditions.checkNotNull(forwarder);
125         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
126         spliceToSuccessor(forwarder);
127     }
128
129     public abstract Optional<T> getBackendInfo();
130
131     @GuardedBy("this")
132     void spliceToSuccessor(final ReconnectForwarder successor) {
133         ConnectionEntry entry = pending.poll();
134         while (entry != null) {
135             successor.forwardEntry(entry);
136             entry = pending.poll();
137         }
138     }
139
140     final ConnectionEntry dequeEntry() {
141         lastProgress = readTime();
142         return pending.poll();
143     }
144
145     void enqueueEntry(final ConnectionEntry entry) {
146         pending.add(entry);
147     }
148
149     /**
150      * Schedule a timer to fire on the actor thread after a delay.
151      *
152      * @param delay Delay, in nanoseconds
153      */
154     private void scheduleTimer(final FiniteDuration delay) {
155         LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay);
156         context.executeInActor(this::runTimer, delay);
157     }
158
159     /**
160      * Check queue timeouts and return true if a timeout has occurred.
161      *
162      * @return True if a timeout occurred
163      * @throws NoProgressException if the queue failed to make progress for an extended
164      *                             time.
165      */
166     @VisibleForTesting
167     final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
168         final long now = readTime();
169
170         if (!isEmpty()) {
171             final long ticksSinceProgress = now - lastProgress;
172             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
173                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
174                     TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
175
176                 poison(new NoProgressException(ticksSinceProgress));
177                 current.removeConnection(this);
178                 return current;
179             }
180         }
181
182         // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
183         // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
184         // return convention.
185         final Optional<FiniteDuration> delay = checkTimeout(now);
186         if (delay == null) {
187             // We have timed out. There is no point in scheduling a timer
188             return reconnectConnection(current);
189         }
190
191         if (delay.isPresent()) {
192             // If there is new delay, schedule a timer
193             scheduleTimer(delay.get());
194         }
195
196         return current;
197     }
198
199     boolean isEmpty() {
200         return pending.isEmpty();
201     }
202
203     /*
204      * We are using tri-state return here to indicate one of three conditions:
205      * - if there is no timeout to schedule, return Optional.empty()
206      * - if there is a timeout to schedule, return a non-empty optional
207      * - if this connections has timed out, return null
208      */
209     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
210             justification = "Returning null Optional is documented in the API contract.")
211     final Optional<FiniteDuration> checkTimeout(final ConnectionEntry head, final long now) {
212         if (head == null) {
213             return Optional.empty();
214         }
215
216         final long delay = head.getEnqueuedTicks() - now + REQUEST_TIMEOUT_NANOS;
217         if (delay <= 0) {
218             LOG.debug("Connection {} timed out", this);
219             return null;
220         }
221
222         return Optional.of(FiniteDuration.apply(delay, TimeUnit.NANOSECONDS));
223     }
224
225     /*
226      * We are using tri-state return here to indicate one of three conditions:
227      * - if there is no timeout to schedule, return Optional.empty()
228      * - if there is a timeout to schedule, return a non-empty optional
229      * - if this connections has timed out, return null
230      */
231     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
232             justification = "Returning null Optional is documented in the API contract.")
233     Optional<FiniteDuration> checkTimeout(final long now) {
234         return checkTimeout(pending.peek(), now);
235     }
236
237     static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
238         for (ConnectionEntry e : queue) {
239             final Request<?, ?> request = e.getRequest();
240             LOG.trace("Poisoning request {}", request, cause);
241             e.complete(request.toRequestFailure(cause));
242         }
243         queue.clear();
244     }
245
246     void poison(final RequestException cause) {
247         poisoned = cause;
248         poisonQueue(pending, cause);
249     }
250
251     @VisibleForTesting
252     final RequestException poisoned() {
253         return poisoned;
254     }
255
256     abstract ClientActorBehavior<T> reconnectConnection(ClientActorBehavior<T> current);
257
258     abstract void receiveResponse(final ResponseEnvelope<?> envelope);
259 }