2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Ticker;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Iterator;
17 import java.util.Optional;
18 import java.util.Queue;
19 import java.util.concurrent.CompletionStage;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nullable;
22 import javax.annotation.concurrent.NotThreadSafe;
23 import org.opendaylight.controller.cluster.access.concepts.Request;
24 import org.opendaylight.controller.cluster.access.concepts.RequestException;
25 import org.opendaylight.controller.cluster.access.concepts.Response;
26 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.duration.FiniteDuration;
32 * A queue that processes entries in sequence.
34 * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
35 * retries and enqueues work as expected.
38 final class SequencedQueue {
39 private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
41 // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
43 static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
45 static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
46 private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
47 TimeUnit.NANOSECONDS);
50 * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
51 * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
54 private static final int DEFAULT_TX_LIMIT = 1000;
56 private final Ticker ticker;
57 private final Long cookie;
60 * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
61 * with the limit changing between reconnects (which imply retransmission).
63 * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
64 * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
65 * and one for requests which have not been transmitted at all.
67 * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
68 * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
69 * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
70 * to retransmit -- hence the second queue.
72 private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
73 private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
74 private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
77 * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
78 * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
81 private CompletionStage<? extends BackendInfo> backendProof;
82 private BackendInfo backend;
84 // This is not final because we need to be able to replace it.
85 private long txSequence;
87 private int lastTxLimit = DEFAULT_TX_LIMIT;
90 * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
92 private Object expectingTimer;
94 private long lastProgress;
96 // Updated from application thread
97 private volatile boolean notClosed = true;
99 SequencedQueue(final Long cookie, final Ticker ticker) {
100 this.cookie = Preconditions.checkNotNull(cookie);
101 this.ticker = Preconditions.checkNotNull(ticker);
102 lastProgress = ticker.read();
109 private void checkNotClosed() {
110 Preconditions.checkState(notClosed, "Queue %s is closed", this);
113 private long nextTxSequence() {
118 * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
119 * the following scenarios:
120 * 1) The request has been enqueued and transmitted. No further actions are necessary
121 * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
122 * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
123 * process needs to complete before transmission occurs
125 * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
126 * the scenarios above according to the following rules:
127 * - if is null, the first case applies
128 * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
129 * resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
130 * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
132 * @param request Request to be sent
133 * @param callback Callback to be invoked
134 * @return Optional duration with semantics described above.
136 @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
139 final long now = ticker.read();
140 final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
141 if (backend == null) {
142 LOG.debug("No backend available, request resolution");
144 return Optional.empty();
146 if (!lastInflight.isEmpty()) {
147 LOG.debug("Retransmit not yet complete, delaying request {}", request);
151 if (currentInflight.size() >= lastTxLimit) {
152 LOG.debug("Queue is at capacity, delayed sending of request {}", request);
159 if (currentInflight.offer(e)) {
160 LOG.debug("Enqueued request {} to queue {}", request, this);
162 // This shouldn't happen since the queue has unlimited capacity but check anyway to avoid FindBugs warning
163 // about checking return value.
164 LOG.warn("Fail to enqueued request {} to queue {}", request, this);
167 e.retransmit(backend, nextTxSequence(), now);
168 if (expectingTimer == null) {
169 expectingTimer = now + REQUEST_TIMEOUT_NANOS;
170 return Optional.of(INITIAL_REQUEST_TIMEOUT);
177 * We are using tri-state return here to indicate one of three conditions:
178 * - if a matching entry is found, return an Optional containing it
179 * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
180 * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
182 @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
183 justification = "Returning null Optional is documented in the API contract.")
184 private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
185 final ResponseEnvelope<?> envelope) {
186 // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
187 // to use an iterator
188 final Iterator<SequencedQueueEntry> it = queue.iterator();
189 while (it.hasNext()) {
190 final SequencedQueueEntry e = it.next();
191 final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
193 final Request<?, ?> request = e.getRequest();
194 final Response<?, ?> response = envelope.getMessage();
196 // First check for matching target, or move to next entry
197 if (!request.getTarget().equals(response.getTarget())) {
201 // Sanity-check logical sequence, ignore any out-of-order messages
202 if (request.getSequence() != response.getSequence()) {
203 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
204 return Optional.empty();
207 // Now check session match
208 if (envelope.getSessionId() != txDetails.getSessionId()) {
209 LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
210 return Optional.empty();
212 if (envelope.getTxSequence() != txDetails.getTxSequence()) {
213 LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
214 return Optional.empty();
217 LOG.debug("Completing request {} with {}", request, envelope);
219 return Optional.of(e);
225 ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
226 Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
227 if (maybeEntry == null) {
228 maybeEntry = findMatchingEntry(lastInflight, envelope);
231 if (maybeEntry == null || !maybeEntry.isPresent()) {
232 LOG.warn("No request matching {} found, ignoring response", envelope);
236 lastProgress = ticker.read();
237 final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
239 // We have freed up a slot, try to transmit something
240 if (backend != null) {
241 final int toSend = lastTxLimit - currentInflight.size();
250 private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
254 final SequencedQueueEntry e = queue.poll();
259 LOG.debug("Transmitting entry {}", e);
260 e.retransmit(backend, nextTxSequence(), lastProgress);
267 private void runTransmit(final int count) {
270 // Process lastInflight first, possibly clearing it
271 if (!lastInflight.isEmpty()) {
272 toSend = transmitEntries(lastInflight, count);
273 if (lastInflight.isEmpty()) {
274 // We won't be needing the queue anymore, change it to specialized implementation
275 lastInflight = EmptyQueue.getInstance();
281 // Process pending next.
282 transmitEntries(pending, toSend);
285 Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof,
286 final BackendInfo backend) {
287 Preconditions.checkNotNull(backend);
288 if (!proof.equals(backendProof)) {
289 LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
290 return Optional.empty();
293 LOG.debug("Resolved backend {}", backend);
295 // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
296 // and also not to exceed new limits
297 final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
298 newLast.addAll(currentInflight);
299 newLast.addAll(lastInflight);
300 lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
302 // Clear currentInflight, possibly compacting it
303 final int txLimit = backend.getMaxMessages();
304 if (lastTxLimit > txLimit) {
305 currentInflight = new ArrayDeque<>();
307 currentInflight.clear();
310 // We are ready to roll
311 this.backend = backend;
314 lastTxLimit = txLimit;
315 lastProgress = ticker.read();
317 // No pending requests, return
318 if (lastInflight.isEmpty() && pending.isEmpty()) {
319 return Optional.empty();
322 LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
324 runTransmit(lastTxLimit);
326 // Calculate next timer if necessary
327 if (expectingTimer == null) {
328 // Request transmission may have cost us some time. Recalculate timeout.
329 final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
330 expectingTimer = nextTicks;
331 return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
333 return Optional.empty();
337 boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
338 if (!proof.equals(backendProof)) {
339 LOG.debug("Setting resolution handle to {}", proof);
340 backendProof = proof;
343 LOG.trace("Already resolving handle {}", proof);
348 boolean hasCompleted() {
349 return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
353 * Check queue timeouts and return true if a timeout has occured.
355 * @return True if a timeout occured
356 * @throws NoProgressException if the queue failed to make progress for an extended
359 boolean runTimeout() throws NoProgressException {
360 expectingTimer = null;
361 final long now = ticker.read();
363 if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
364 final long ticksSinceProgress = now - lastProgress;
365 if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
366 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
367 TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
369 final NoProgressException ex = new NoProgressException(ticksSinceProgress);
375 // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
376 final SequencedQueueEntry head = currentInflight.peek();
377 if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
379 LOG.debug("Queue {} invalidated backend info", this);
386 private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
387 queue.forEach(e -> e.poison(cause));
391 void poison(final RequestException cause) {
394 poisonQueue(currentInflight, cause);
395 poisonQueue(lastInflight, cause);
396 poisonQueue(pending, cause);
399 // FIXME: add a caller from ClientSingleTransaction