2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Ticker;
13 import com.google.common.base.Verify;
14 import java.util.ArrayDeque;
15 import java.util.Iterator;
16 import java.util.Optional;
17 import java.util.Queue;
18 import java.util.concurrent.CompletionStage;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.Nullable;
21 import javax.annotation.concurrent.NotThreadSafe;
22 import org.opendaylight.controller.cluster.access.concepts.Request;
23 import org.opendaylight.controller.cluster.access.concepts.RequestException;
24 import org.opendaylight.controller.cluster.access.concepts.Response;
25 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.duration.FiniteDuration;
31 * A queue that processes entries in sequence.
33 * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
34 * retries and enqueues work as expected.
37 final class SequencedQueue {
38 private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
40 // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
42 static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
44 static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
45 private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
46 TimeUnit.NANOSECONDS);
49 * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
50 * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
53 private static final int DEFAULT_TX_LIMIT = 1000;
55 private final Ticker ticker;
56 private final Long cookie;
59 * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
60 * with the limit changing between reconnects (which imply retransmission).
62 * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
63 * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
64 * and one for requests which have not been transmitted at all.
66 * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
67 * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
68 * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
69 * to retransmit -- hence the second queue.
71 private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
72 private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
73 private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
76 * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
77 * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
80 private CompletionStage<? extends BackendInfo> backendProof;
81 private BackendInfo backend;
83 // This is not final because we need to be able to replace it.
84 private long txSequence;
86 private int lastTxLimit = DEFAULT_TX_LIMIT;
89 * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
91 private Object expectingTimer;
93 private long lastProgress;
95 // Updated from application thread
96 private volatile boolean notClosed = true;
98 SequencedQueue(final Long cookie, final Ticker ticker) {
99 this.cookie = Preconditions.checkNotNull(cookie);
100 this.ticker = Preconditions.checkNotNull(ticker);
101 lastProgress = ticker.read();
108 private void checkNotClosed() {
109 Preconditions.checkState(notClosed, "Queue %s is closed", this);
112 private long nextTxSequence() {
117 * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
118 * the following scenarios:
119 * 1) The request has been enqueued and transmitted. No further actions are necessary
120 * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
121 * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
122 * process needs to complete before transmission occurs
124 * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
125 * the scenarios above according to the following rules:
126 * - if is null, the first case applies
127 * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
128 * resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
129 * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
131 * @param request Request to be sent
132 * @param callback Callback to be invoked
133 * @return Optional duration with semantics described above.
135 @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
138 final long now = ticker.read();
139 final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
140 if (backend == null) {
141 LOG.debug("No backend available, request resolution");
143 return Optional.empty();
145 if (!lastInflight.isEmpty()) {
146 LOG.debug("Retransmit not yet complete, delaying request {}", request);
150 if (currentInflight.size() >= lastTxLimit) {
151 LOG.debug("Queue is at capacity, delayed sending of request {}", request);
157 currentInflight.offer(e);
158 LOG.debug("Enqueued request {} to queue {}", request, this);
160 e.retransmit(backend, nextTxSequence(), now);
161 if (expectingTimer == null) {
162 expectingTimer = now + REQUEST_TIMEOUT_NANOS;
163 return Optional.of(INITIAL_REQUEST_TIMEOUT);
170 * We are using tri-state return here to indicate one of three conditions:
171 * - if a matching entry is found, return an Optional containing it
172 * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
173 * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
175 private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
176 final ResponseEnvelope<?> envelope) {
177 // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
178 // to use an iterator
179 final Iterator<SequencedQueueEntry> it = queue.iterator();
180 while (it.hasNext()) {
181 final SequencedQueueEntry e = it.next();
182 final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
184 final Request<?, ?> request = e.getRequest();
185 final Response<?, ?> response = envelope.getMessage();
187 // First check for matching target, or move to next entry
188 if (!request.getTarget().equals(response.getTarget())) {
192 // Sanity-check logical sequence, ignore any out-of-order messages
193 if (request.getSequence() != response.getSequence()) {
194 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
195 return Optional.empty();
198 // Now check session match
199 if (envelope.getSessionId() != txDetails.getSessionId()) {
200 LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
201 return Optional.empty();
203 if (envelope.getTxSequence() != txDetails.getTxSequence()) {
204 LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
205 return Optional.empty();
208 LOG.debug("Completing request {} with {}", request, envelope);
210 return Optional.of(e);
216 ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
217 Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
218 if (maybeEntry == null) {
219 maybeEntry = findMatchingEntry(lastInflight, envelope);
222 if (maybeEntry == null || !maybeEntry.isPresent()) {
223 LOG.warn("No request matching {} found, ignoring response", envelope);
227 lastProgress = ticker.read();
228 final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
230 // We have freed up a slot, try to transmit something
231 if (backend != null) {
232 final int toSend = lastTxLimit - currentInflight.size();
241 private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
245 final SequencedQueueEntry e = queue.poll();
250 LOG.debug("Transmitting entry {}", e);
251 e.retransmit(backend, nextTxSequence(), lastProgress);
258 private void runTransmit(final int count) {
261 // Process lastInflight first, possibly clearing it
262 if (!lastInflight.isEmpty()) {
263 toSend = transmitEntries(lastInflight, count);
264 if (lastInflight.isEmpty()) {
265 // We won't be needing the queue anymore, change it to specialized implementation
266 lastInflight = EmptyQueue.getInstance();
272 // Process pending next.
273 transmitEntries(pending, toSend);
276 Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof,
277 final BackendInfo backend) {
278 Preconditions.checkNotNull(backend);
279 if (!proof.equals(backendProof)) {
280 LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
281 return Optional.empty();
284 LOG.debug("Resolved backend {}", backend);
286 // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
287 // and also not to exceed new limits
288 final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
289 newLast.addAll(currentInflight);
290 newLast.addAll(lastInflight);
291 lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
293 // Clear currentInflight, possibly compacting it
294 final int txLimit = backend.getMaxMessages();
295 if (lastTxLimit > txLimit) {
296 currentInflight = new ArrayDeque<>();
298 currentInflight.clear();
301 // We are ready to roll
302 this.backend = backend;
305 lastTxLimit = txLimit;
306 lastProgress = ticker.read();
308 // No pending requests, return
309 if (lastInflight.isEmpty() && pending.isEmpty()) {
310 return Optional.empty();
313 LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
315 runTransmit(lastTxLimit);
317 // Calculate next timer if necessary
318 if (expectingTimer == null) {
319 // Request transmission may have cost us some time. Recalculate timeout.
320 final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
321 expectingTimer = nextTicks;
322 return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
324 return Optional.empty();
328 boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
329 if (!proof.equals(backendProof)) {
330 LOG.debug("Setting resolution handle to {}", proof);
331 backendProof = proof;
334 LOG.trace("Already resolving handle {}", proof);
339 boolean hasCompleted() {
340 return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
344 * Check queue timeouts and return true if a timeout has occured.
346 * @return True if a timeout occured
347 * @throws NoProgressException if the queue failed to make progress for an extended
350 boolean runTimeout() throws NoProgressException {
351 expectingTimer = null;
352 final long now = ticker.read();
354 if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
355 final long ticksSinceProgress = now - lastProgress;
356 if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
357 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
358 TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
360 final NoProgressException ex = new NoProgressException(ticksSinceProgress);
366 // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
367 final SequencedQueueEntry head = currentInflight.peek();
368 if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
370 LOG.debug("Queue {} invalidated backend info", this);
377 private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
378 queue.forEach(e -> e.poison(cause));
382 void poison(final RequestException cause) {
385 poisonQueue(currentInflight, cause);
386 poisonQueue(lastInflight, cause);
387 poisonQueue(pending, cause);
390 // FIXME: add a caller from ClientSingleTransaction