BUG-5280: make EmptyQueue public
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / SequencedQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Ticker;
13 import com.google.common.base.Verify;
14 import java.util.ArrayDeque;
15 import java.util.Iterator;
16 import java.util.Optional;
17 import java.util.Queue;
18 import java.util.concurrent.CompletionStage;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.Nullable;
21 import javax.annotation.concurrent.NotThreadSafe;
22 import org.opendaylight.controller.cluster.access.concepts.Request;
23 import org.opendaylight.controller.cluster.access.concepts.RequestException;
24 import org.opendaylight.controller.cluster.access.concepts.Response;
25 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.duration.FiniteDuration;
29
30 /*
31  * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
32  *       retries and enqueues work as expected.
33  */
34 @NotThreadSafe
35 final class SequencedQueue {
36     private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
37
38     // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
39     @VisibleForTesting
40     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
41     @VisibleForTesting
42     static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
43     private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
44         TimeUnit.NANOSECONDS);
45
46     /**
47      * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
48      * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
49      * resolution.
50      */
51     private static final int DEFAULT_TX_LIMIT = 1000;
52
53     private final Ticker ticker;
54     private final Long cookie;
55
56     /*
57      * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
58      * with the limit changing between reconnects (which imply retransmission).
59      *
60      * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
61      * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
62      * and one for requests which have not been transmitted at all.
63      *
64      * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
65      * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
66      * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
67      * to retransmit -- hence the second queue.
68      */
69     private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
70     private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
71     private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
72
73     /**
74      * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
75      * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
76      * result.
77      */
78     private CompletionStage<? extends BackendInfo> backendProof;
79     private BackendInfo backend;
80
81     // This is not final because we need to be able to replace it.
82     private long txSequence;
83
84     private int lastTxLimit = DEFAULT_TX_LIMIT;
85
86     /**
87      * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
88      */
89     private Object expectingTimer;
90
91     private long lastProgress;
92
93     // Updated from application thread
94     private volatile boolean notClosed = true;
95
96     SequencedQueue(final Long cookie, final Ticker ticker) {
97         this.cookie = Preconditions.checkNotNull(cookie);
98         this.ticker = Preconditions.checkNotNull(ticker);
99         lastProgress = ticker.read();
100     }
101
102     Long getCookie() {
103         return cookie;
104     }
105
106     private void checkNotClosed() {
107         Preconditions.checkState(notClosed, "Queue %s is closed", this);
108     }
109
110     private long nextTxSequence() {
111         return txSequence++;
112     }
113
114     /**
115      * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
116      * the following scenarios:
117      * 1) The request has been enqueued and transmitted. No further actions are necessary
118      * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
119      * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
120      *    process needs to complete before transmission occurs
121      *
122      * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
123      * the scenarios above according to the following rules:
124      * - if is null, the first case applies
125      * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
126      *      resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
127      * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
128      *
129      * @param request Request to be sent
130      * @param callback Callback to be invoked
131      * @return Optional duration with semantics described above.
132      */
133     @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
134         checkNotClosed();
135
136         final long now = ticker.read();
137         final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
138         if (backend == null) {
139             LOG.debug("No backend available, request resolution");
140             pending.add(e);
141             return Optional.empty();
142         }
143         if (!lastInflight.isEmpty()) {
144             LOG.debug("Retransmit not yet complete, delaying request {}", request);
145             pending.add(e);
146             return null;
147         }
148         if (currentInflight.size() >= lastTxLimit) {
149             LOG.debug("Queue is at capacity, delayed sending of request {}", request);
150             pending.add(e);
151             return null;
152         }
153
154         // Ready to transmit
155         currentInflight.offer(e);
156         LOG.debug("Enqueued request {} to queue {}", request, this);
157
158         e.retransmit(backend, nextTxSequence(), now);
159         if (expectingTimer == null) {
160             expectingTimer = now + REQUEST_TIMEOUT_NANOS;
161             return Optional.of(INITIAL_REQUEST_TIMEOUT);
162         } else {
163             return null;
164         }
165     }
166
167     /*
168      * We are using tri-state return here to indicate one of three conditions:
169      * - if a matching entry is found, return an Optional containing it
170      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
171      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
172      */
173     private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
174             final ResponseEnvelope<?> envelope) {
175         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
176         // to use an iterator
177         final Iterator<SequencedQueueEntry> it = queue.iterator();
178         while (it.hasNext()) {
179             final SequencedQueueEntry e = it.next();
180             final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
181
182             final Request<?, ?> request = e.getRequest();
183             final Response<?, ?> response = envelope.getMessage();
184
185             // First check for matching target, or move to next entry
186             if (!request.getTarget().equals(response.getTarget())) {
187                 continue;
188             }
189
190             // Sanity-check logical sequence, ignore any out-of-order messages
191             if (request.getSequence() != response.getSequence()) {
192                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
193                 return Optional.empty();
194             }
195
196             // Now check session match
197             if (envelope.getSessionId() != txDetails.getSessionId()) {
198                 LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
199                 return Optional.empty();
200             }
201             if (envelope.getTxSequence() != txDetails.getTxSequence()) {
202                 LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
203                 return Optional.empty();
204             }
205
206             LOG.debug("Completing request {} with {}", request, envelope);
207             it.remove();
208             return Optional.of(e);
209         }
210
211         return null;
212     }
213
214     ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
215         Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
216         if (maybeEntry == null) {
217             maybeEntry = findMatchingEntry(lastInflight, envelope);
218         }
219
220         if (maybeEntry == null || !maybeEntry.isPresent()) {
221             LOG.warn("No request matching {} found, ignoring response", envelope);
222             return current;
223         }
224
225         lastProgress = ticker.read();
226         final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
227
228         // We have freed up a slot, try to transmit something
229         if (backend != null) {
230             final int toSend = lastTxLimit - currentInflight.size();
231             if (toSend > 0) {
232                 runTransmit(toSend);
233             }
234         }
235
236         return ret;
237     }
238
239     private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
240         int toSend = count;
241
242         while (toSend > 0) {
243             final SequencedQueueEntry e = queue.poll();
244             if (e == null) {
245                 break;
246             }
247
248             LOG.debug("Transmitting entry {}", e);
249             e.retransmit(backend, nextTxSequence(), lastProgress);
250             toSend--;
251         }
252
253         return toSend;
254     }
255
256     private void runTransmit(final int count) {
257         final int toSend;
258
259         // Process lastInflight first, possibly clearing it
260         if (!lastInflight.isEmpty()) {
261             toSend = transmitEntries(lastInflight, count);
262             if (lastInflight.isEmpty()) {
263                 // We won't be needing the queue anymore, change it to specialized implementation
264                 lastInflight = EmptyQueue.getInstance();
265             }
266         } else {
267             toSend = count;
268         }
269
270         // Process pending next.
271         transmitEntries(pending, toSend);
272     }
273
274     Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
275         Preconditions.checkNotNull(backend);
276         if (!proof.equals(backendProof)) {
277             LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
278             return Optional.empty();
279         }
280
281         LOG.debug("Resolved backend {}",  backend);
282
283         // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
284         // and also not to exceed new limits
285         final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
286         newLast.addAll(currentInflight);
287         newLast.addAll(lastInflight);
288         lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
289
290         // Clear currentInflight, possibly compacting it
291         final int txLimit = backend.getMaxMessages();
292         if (lastTxLimit > txLimit) {
293             currentInflight = new ArrayDeque<>();
294         } else {
295             currentInflight.clear();
296         }
297
298         // We are ready to roll
299         this.backend = backend;
300         backendProof = null;
301         txSequence = 0;
302         lastTxLimit = txLimit;
303         lastProgress = ticker.read();
304
305         // No pending requests, return
306         if (lastInflight.isEmpty() && pending.isEmpty()) {
307             return Optional.empty();
308         }
309
310         LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
311
312         runTransmit(lastTxLimit);
313
314         // Calculate next timer if necessary
315         if (expectingTimer == null) {
316             // Request transmission may have cost us some time. Recalculate timeout.
317             final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
318             expectingTimer = nextTicks;
319             return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
320         } else {
321             return Optional.empty();
322         }
323     }
324
325     boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
326         if (!proof.equals(backendProof)) {
327             LOG.debug("Setting resolution handle to {}", proof);
328             backendProof = proof;
329             return true;
330         } else {
331             LOG.trace("Already resolving handle {}", proof);
332             return false;
333         }
334     }
335
336     boolean hasCompleted() {
337         return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
338     }
339
340     /**
341      * Check queue timeouts and return true if a timeout has occured.
342      *
343      * @return True if a timeout occured
344      * @throws NoProgressException if the queue failed to make progress for an extended
345      *                             time.
346      */
347     boolean runTimeout() throws NoProgressException {
348         expectingTimer = null;
349         final long now = ticker.read();
350
351         if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
352             final long ticksSinceProgress = now - lastProgress;
353             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
354                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
355                     TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
356
357                 final NoProgressException ex = new NoProgressException(ticksSinceProgress);
358                 poison(ex);
359                 throw ex;
360             }
361         }
362
363         // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
364         final SequencedQueueEntry head = currentInflight.peek();
365         if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
366             backend = null;
367             LOG.debug("Queue {} invalidated backend info", this);
368             return true;
369         } else {
370             return false;
371         }
372     }
373
374     private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
375         queue.forEach(e -> e.poison(cause));
376         queue.clear();
377     }
378
379     void poison(final RequestException cause) {
380         close();
381
382         poisonQueue(currentInflight, cause);
383         poisonQueue(lastInflight, cause);
384         poisonQueue(pending, cause);
385     }
386
387     // FIXME: add a caller from ClientSingleTransaction
388     void close() {
389         notClosed = false;
390     }
391 }