0e8d1b9d56b8c5674268ca1ec21b9fb24b43ad3f
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / SequencedQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Ticker;
13 import com.google.common.base.Verify;
14 import java.util.ArrayDeque;
15 import java.util.Iterator;
16 import java.util.Optional;
17 import java.util.Queue;
18 import java.util.concurrent.CompletionStage;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.Nullable;
21 import javax.annotation.concurrent.NotThreadSafe;
22 import org.opendaylight.controller.cluster.access.concepts.Request;
23 import org.opendaylight.controller.cluster.access.concepts.RequestException;
24 import org.opendaylight.controller.cluster.access.concepts.Response;
25 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.duration.FiniteDuration;
29
30 /*
31  * A queue that processes entries in sequence.
32  *
33  * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
34  *       retries and enqueues work as expected.
35  */
36 @NotThreadSafe
37 final class SequencedQueue {
38     private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
39
40     // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
41     @VisibleForTesting
42     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
43     @VisibleForTesting
44     static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
45     private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
46         TimeUnit.NANOSECONDS);
47
48     /**
49      * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
50      * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
51      * resolution.
52      */
53     private static final int DEFAULT_TX_LIMIT = 1000;
54
55     private final Ticker ticker;
56     private final Long cookie;
57
58     /*
59      * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
60      * with the limit changing between reconnects (which imply retransmission).
61      *
62      * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
63      * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
64      * and one for requests which have not been transmitted at all.
65      *
66      * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
67      * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
68      * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
69      * to retransmit -- hence the second queue.
70      */
71     private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
72     private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
73     private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
74
75     /**
76      * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
77      * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
78      * result.
79      */
80     private CompletionStage<? extends BackendInfo> backendProof;
81     private BackendInfo backend;
82
83     // This is not final because we need to be able to replace it.
84     private long txSequence;
85
86     private int lastTxLimit = DEFAULT_TX_LIMIT;
87
88     /**
89      * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
90      */
91     private Object expectingTimer;
92
93     private long lastProgress;
94
95     // Updated from application thread
96     private volatile boolean notClosed = true;
97
98     SequencedQueue(final Long cookie, final Ticker ticker) {
99         this.cookie = Preconditions.checkNotNull(cookie);
100         this.ticker = Preconditions.checkNotNull(ticker);
101         lastProgress = ticker.read();
102     }
103
104     Long getCookie() {
105         return cookie;
106     }
107
108     private void checkNotClosed() {
109         Preconditions.checkState(notClosed, "Queue %s is closed", this);
110     }
111
112     private long nextTxSequence() {
113         return txSequence++;
114     }
115
116     /**
117      * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
118      * the following scenarios:
119      * 1) The request has been enqueued and transmitted. No further actions are necessary
120      * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
121      * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
122      *    process needs to complete before transmission occurs
123      * <p/>
124      * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
125      * the scenarios above according to the following rules:
126      * - if is null, the first case applies
127      * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
128      *      resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
129      * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
130      *
131      * @param request Request to be sent
132      * @param callback Callback to be invoked
133      * @return Optional duration with semantics described above.
134      */
135     @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
136         checkNotClosed();
137
138         final long now = ticker.read();
139         final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
140         if (backend == null) {
141             LOG.debug("No backend available, request resolution");
142             pending.add(e);
143             return Optional.empty();
144         }
145         if (!lastInflight.isEmpty()) {
146             LOG.debug("Retransmit not yet complete, delaying request {}", request);
147             pending.add(e);
148             return null;
149         }
150         if (currentInflight.size() >= lastTxLimit) {
151             LOG.debug("Queue is at capacity, delayed sending of request {}", request);
152             pending.add(e);
153             return null;
154         }
155
156         // Ready to transmit
157         currentInflight.offer(e);
158         LOG.debug("Enqueued request {} to queue {}", request, this);
159
160         e.retransmit(backend, nextTxSequence(), now);
161         if (expectingTimer == null) {
162             expectingTimer = now + REQUEST_TIMEOUT_NANOS;
163             return Optional.of(INITIAL_REQUEST_TIMEOUT);
164         } else {
165             return null;
166         }
167     }
168
169     /*
170      * We are using tri-state return here to indicate one of three conditions:
171      * - if a matching entry is found, return an Optional containing it
172      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
173      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
174      */
175     private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
176             final ResponseEnvelope<?> envelope) {
177         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
178         // to use an iterator
179         final Iterator<SequencedQueueEntry> it = queue.iterator();
180         while (it.hasNext()) {
181             final SequencedQueueEntry e = it.next();
182             final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
183
184             final Request<?, ?> request = e.getRequest();
185             final Response<?, ?> response = envelope.getMessage();
186
187             // First check for matching target, or move to next entry
188             if (!request.getTarget().equals(response.getTarget())) {
189                 continue;
190             }
191
192             // Sanity-check logical sequence, ignore any out-of-order messages
193             if (request.getSequence() != response.getSequence()) {
194                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
195                 return Optional.empty();
196             }
197
198             // Now check session match
199             if (envelope.getSessionId() != txDetails.getSessionId()) {
200                 LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
201                 return Optional.empty();
202             }
203             if (envelope.getTxSequence() != txDetails.getTxSequence()) {
204                 LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
205                 return Optional.empty();
206             }
207
208             LOG.debug("Completing request {} with {}", request, envelope);
209             it.remove();
210             return Optional.of(e);
211         }
212
213         return null;
214     }
215
216     ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
217         Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
218         if (maybeEntry == null) {
219             maybeEntry = findMatchingEntry(lastInflight, envelope);
220         }
221
222         if (maybeEntry == null || !maybeEntry.isPresent()) {
223             LOG.warn("No request matching {} found, ignoring response", envelope);
224             return current;
225         }
226
227         lastProgress = ticker.read();
228         final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
229
230         // We have freed up a slot, try to transmit something
231         if (backend != null) {
232             final int toSend = lastTxLimit - currentInflight.size();
233             if (toSend > 0) {
234                 runTransmit(toSend);
235             }
236         }
237
238         return ret;
239     }
240
241     private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
242         int toSend = count;
243
244         while (toSend > 0) {
245             final SequencedQueueEntry e = queue.poll();
246             if (e == null) {
247                 break;
248             }
249
250             LOG.debug("Transmitting entry {}", e);
251             e.retransmit(backend, nextTxSequence(), lastProgress);
252             toSend--;
253         }
254
255         return toSend;
256     }
257
258     private void runTransmit(final int count) {
259         final int toSend;
260
261         // Process lastInflight first, possibly clearing it
262         if (!lastInflight.isEmpty()) {
263             toSend = transmitEntries(lastInflight, count);
264             if (lastInflight.isEmpty()) {
265                 // We won't be needing the queue anymore, change it to specialized implementation
266                 lastInflight = EmptyQueue.getInstance();
267             }
268         } else {
269             toSend = count;
270         }
271
272         // Process pending next.
273         transmitEntries(pending, toSend);
274     }
275
276     Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof,
277             final BackendInfo backend) {
278         Preconditions.checkNotNull(backend);
279         if (!proof.equals(backendProof)) {
280             LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
281             return Optional.empty();
282         }
283
284         LOG.debug("Resolved backend {}",  backend);
285
286         // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
287         // and also not to exceed new limits
288         final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
289         newLast.addAll(currentInflight);
290         newLast.addAll(lastInflight);
291         lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
292
293         // Clear currentInflight, possibly compacting it
294         final int txLimit = backend.getMaxMessages();
295         if (lastTxLimit > txLimit) {
296             currentInflight = new ArrayDeque<>();
297         } else {
298             currentInflight.clear();
299         }
300
301         // We are ready to roll
302         this.backend = backend;
303         backendProof = null;
304         txSequence = 0;
305         lastTxLimit = txLimit;
306         lastProgress = ticker.read();
307
308         // No pending requests, return
309         if (lastInflight.isEmpty() && pending.isEmpty()) {
310             return Optional.empty();
311         }
312
313         LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
314
315         runTransmit(lastTxLimit);
316
317         // Calculate next timer if necessary
318         if (expectingTimer == null) {
319             // Request transmission may have cost us some time. Recalculate timeout.
320             final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
321             expectingTimer = nextTicks;
322             return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
323         } else {
324             return Optional.empty();
325         }
326     }
327
328     boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
329         if (!proof.equals(backendProof)) {
330             LOG.debug("Setting resolution handle to {}", proof);
331             backendProof = proof;
332             return true;
333         } else {
334             LOG.trace("Already resolving handle {}", proof);
335             return false;
336         }
337     }
338
339     boolean hasCompleted() {
340         return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
341     }
342
343     /**
344      * Check queue timeouts and return true if a timeout has occured.
345      *
346      * @return True if a timeout occured
347      * @throws NoProgressException if the queue failed to make progress for an extended
348      *                             time.
349      */
350     boolean runTimeout() throws NoProgressException {
351         expectingTimer = null;
352         final long now = ticker.read();
353
354         if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
355             final long ticksSinceProgress = now - lastProgress;
356             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
357                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
358                     TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
359
360                 final NoProgressException ex = new NoProgressException(ticksSinceProgress);
361                 poison(ex);
362                 throw ex;
363             }
364         }
365
366         // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
367         final SequencedQueueEntry head = currentInflight.peek();
368         if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
369             backend = null;
370             LOG.debug("Queue {} invalidated backend info", this);
371             return true;
372         } else {
373             return false;
374         }
375     }
376
377     private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
378         queue.forEach(e -> e.poison(cause));
379         queue.clear();
380     }
381
382     void poison(final RequestException cause) {
383         close();
384
385         poisonQueue(currentInflight, cause);
386         poisonQueue(lastInflight, cause);
387         poisonQueue(pending, cause);
388     }
389
390     // FIXME: add a caller from ClientSingleTransaction
391     void close() {
392         notClosed = false;
393     }
394 }