596e353c981f3d21d193c66795753233c40bb6eb
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / SequencedQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Ticker;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Iterator;
17 import java.util.Optional;
18 import java.util.Queue;
19 import java.util.concurrent.CompletionStage;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nullable;
22 import javax.annotation.concurrent.NotThreadSafe;
23 import org.opendaylight.controller.cluster.access.concepts.Request;
24 import org.opendaylight.controller.cluster.access.concepts.RequestException;
25 import org.opendaylight.controller.cluster.access.concepts.Response;
26 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.duration.FiniteDuration;
30
31 /*
32  * A queue that processes entries in sequence.
33  *
34  * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
35  *       retries and enqueues work as expected.
36  */
37 @NotThreadSafe
38 final class SequencedQueue {
39     private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
40
41     // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
42     @VisibleForTesting
43     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
44     @VisibleForTesting
45     static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
46     private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
47         TimeUnit.NANOSECONDS);
48
49     /**
50      * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
51      * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
52      * resolution.
53      */
54     private static final int DEFAULT_TX_LIMIT = 1000;
55
56     private final Ticker ticker;
57     private final Long cookie;
58
59     /*
60      * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
61      * with the limit changing between reconnects (which imply retransmission).
62      *
63      * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
64      * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
65      * and one for requests which have not been transmitted at all.
66      *
67      * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
68      * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
69      * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
70      * to retransmit -- hence the second queue.
71      */
72     private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
73     private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
74     private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
75
76     /**
77      * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
78      * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
79      * result.
80      */
81     private CompletionStage<? extends BackendInfo> backendProof;
82     private BackendInfo backend;
83
84     // This is not final because we need to be able to replace it.
85     private long txSequence;
86
87     private int lastTxLimit = DEFAULT_TX_LIMIT;
88
89     /**
90      * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
91      */
92     private Object expectingTimer;
93
94     private long lastProgress;
95
96     // Updated from application thread
97     private volatile boolean notClosed = true;
98
99     SequencedQueue(final Long cookie, final Ticker ticker) {
100         this.cookie = Preconditions.checkNotNull(cookie);
101         this.ticker = Preconditions.checkNotNull(ticker);
102         lastProgress = ticker.read();
103     }
104
105     Long getCookie() {
106         return cookie;
107     }
108
109     private void checkNotClosed() {
110         Preconditions.checkState(notClosed, "Queue %s is closed", this);
111     }
112
113     private long nextTxSequence() {
114         return txSequence++;
115     }
116
117     /**
118      * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
119      * the following scenarios:
120      * 1) The request has been enqueued and transmitted. No further actions are necessary
121      * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
122      * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
123      *    process needs to complete before transmission occurs
124      * <p/>
125      * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
126      * the scenarios above according to the following rules:
127      * - if is null, the first case applies
128      * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
129      *      resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
130      * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
131      *
132      * @param request Request to be sent
133      * @param callback Callback to be invoked
134      * @return Optional duration with semantics described above.
135      */
136     @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
137         checkNotClosed();
138
139         final long now = ticker.read();
140         final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
141         if (backend == null) {
142             LOG.debug("No backend available, request resolution");
143             pending.add(e);
144             return Optional.empty();
145         }
146         if (!lastInflight.isEmpty()) {
147             LOG.debug("Retransmit not yet complete, delaying request {}", request);
148             pending.add(e);
149             return null;
150         }
151         if (currentInflight.size() >= lastTxLimit) {
152             LOG.debug("Queue is at capacity, delayed sending of request {}", request);
153             pending.add(e);
154             return null;
155         }
156
157         // Ready to transmit
158
159         if (currentInflight.offer(e)) {
160             LOG.debug("Enqueued request {} to queue {}", request, this);
161         } else {
162             // This shouldn't happen since the queue has unlimited capacity but check anyway to avoid FindBugs warning
163             // about checking return value.
164             LOG.warn("Fail to enqueued request {} to queue {}", request, this);
165         }
166
167         e.retransmit(backend, nextTxSequence(), now);
168         if (expectingTimer == null) {
169             expectingTimer = now + REQUEST_TIMEOUT_NANOS;
170             return Optional.of(INITIAL_REQUEST_TIMEOUT);
171         } else {
172             return null;
173         }
174     }
175
176     /*
177      * We are using tri-state return here to indicate one of three conditions:
178      * - if a matching entry is found, return an Optional containing it
179      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
180      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
181      */
182     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
183             justification = "Returning null Optional is documented in the API contract.")
184     private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
185             final ResponseEnvelope<?> envelope) {
186         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
187         // to use an iterator
188         final Iterator<SequencedQueueEntry> it = queue.iterator();
189         while (it.hasNext()) {
190             final SequencedQueueEntry e = it.next();
191             final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
192
193             final Request<?, ?> request = e.getRequest();
194             final Response<?, ?> response = envelope.getMessage();
195
196             // First check for matching target, or move to next entry
197             if (!request.getTarget().equals(response.getTarget())) {
198                 continue;
199             }
200
201             // Sanity-check logical sequence, ignore any out-of-order messages
202             if (request.getSequence() != response.getSequence()) {
203                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
204                 return Optional.empty();
205             }
206
207             // Now check session match
208             if (envelope.getSessionId() != txDetails.getSessionId()) {
209                 LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
210                 return Optional.empty();
211             }
212             if (envelope.getTxSequence() != txDetails.getTxSequence()) {
213                 LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
214                 return Optional.empty();
215             }
216
217             LOG.debug("Completing request {} with {}", request, envelope);
218             it.remove();
219             return Optional.of(e);
220         }
221
222         return null;
223     }
224
225     ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
226         Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
227         if (maybeEntry == null) {
228             maybeEntry = findMatchingEntry(lastInflight, envelope);
229         }
230
231         if (maybeEntry == null || !maybeEntry.isPresent()) {
232             LOG.warn("No request matching {} found, ignoring response", envelope);
233             return current;
234         }
235
236         lastProgress = ticker.read();
237         final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
238
239         // We have freed up a slot, try to transmit something
240         if (backend != null) {
241             final int toSend = lastTxLimit - currentInflight.size();
242             if (toSend > 0) {
243                 runTransmit(toSend);
244             }
245         }
246
247         return ret;
248     }
249
250     private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
251         int toSend = count;
252
253         while (toSend > 0) {
254             final SequencedQueueEntry e = queue.poll();
255             if (e == null) {
256                 break;
257             }
258
259             LOG.debug("Transmitting entry {}", e);
260             e.retransmit(backend, nextTxSequence(), lastProgress);
261             toSend--;
262         }
263
264         return toSend;
265     }
266
267     private void runTransmit(final int count) {
268         final int toSend;
269
270         // Process lastInflight first, possibly clearing it
271         if (!lastInflight.isEmpty()) {
272             toSend = transmitEntries(lastInflight, count);
273             if (lastInflight.isEmpty()) {
274                 // We won't be needing the queue anymore, change it to specialized implementation
275                 lastInflight = EmptyQueue.getInstance();
276             }
277         } else {
278             toSend = count;
279         }
280
281         // Process pending next.
282         transmitEntries(pending, toSend);
283     }
284
285     Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof,
286             final BackendInfo backend) {
287         Preconditions.checkNotNull(backend);
288         if (!proof.equals(backendProof)) {
289             LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
290             return Optional.empty();
291         }
292
293         LOG.debug("Resolved backend {}",  backend);
294
295         // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
296         // and also not to exceed new limits
297         final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
298         newLast.addAll(currentInflight);
299         newLast.addAll(lastInflight);
300         lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
301
302         // Clear currentInflight, possibly compacting it
303         final int txLimit = backend.getMaxMessages();
304         if (lastTxLimit > txLimit) {
305             currentInflight = new ArrayDeque<>();
306         } else {
307             currentInflight.clear();
308         }
309
310         // We are ready to roll
311         this.backend = backend;
312         backendProof = null;
313         txSequence = 0;
314         lastTxLimit = txLimit;
315         lastProgress = ticker.read();
316
317         // No pending requests, return
318         if (lastInflight.isEmpty() && pending.isEmpty()) {
319             return Optional.empty();
320         }
321
322         LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
323
324         runTransmit(lastTxLimit);
325
326         // Calculate next timer if necessary
327         if (expectingTimer == null) {
328             // Request transmission may have cost us some time. Recalculate timeout.
329             final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
330             expectingTimer = nextTicks;
331             return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
332         } else {
333             return Optional.empty();
334         }
335     }
336
337     boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
338         if (!proof.equals(backendProof)) {
339             LOG.debug("Setting resolution handle to {}", proof);
340             backendProof = proof;
341             return true;
342         } else {
343             LOG.trace("Already resolving handle {}", proof);
344             return false;
345         }
346     }
347
348     boolean hasCompleted() {
349         return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
350     }
351
352     /**
353      * Check queue timeouts and return true if a timeout has occured.
354      *
355      * @return True if a timeout occured
356      * @throws NoProgressException if the queue failed to make progress for an extended
357      *                             time.
358      */
359     boolean runTimeout() throws NoProgressException {
360         expectingTimer = null;
361         final long now = ticker.read();
362
363         if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
364             final long ticksSinceProgress = now - lastProgress;
365             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
366                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
367                     TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
368
369                 final NoProgressException ex = new NoProgressException(ticksSinceProgress);
370                 poison(ex);
371                 throw ex;
372             }
373         }
374
375         // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
376         final SequencedQueueEntry head = currentInflight.peek();
377         if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
378             backend = null;
379             LOG.debug("Queue {} invalidated backend info", this);
380             return true;
381         } else {
382             return false;
383         }
384     }
385
386     private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
387         queue.forEach(e -> e.poison(cause));
388         queue.clear();
389     }
390
391     void poison(final RequestException cause) {
392         close();
393
394         poisonQueue(currentInflight, cause);
395         poisonQueue(lastInflight, cause);
396         poisonQueue(pending, cause);
397     }
398
399     // FIXME: add a caller from ClientSingleTransaction
400     void close() {
401         notClosed = false;
402     }
403 }