import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
-import java.util.Deque;
+import com.google.common.base.Verify;
+import java.util.ArrayDeque;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Optional;
+import java.util.Queue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
TimeUnit.NANOSECONDS);
/**
- * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
- * progress at different speeds, these may be completed out of order.
- *
- * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
- * figure out a more efficient lookup mechanism to deal with responses which do not match the queue
- * order.
+ * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
+ * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
+ * resolution.
*/
- private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
+ private static final int DEFAULT_TX_LIMIT = 1000;
+
private final Ticker ticker;
private final Long cookie;
- // Updated/consulted from actor context only
+ /*
+ * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
+ * with the limit changing between reconnects (which imply retransmission).
+ *
+ * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
+ * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
+ * and one for requests which have not been transmitted at all.
+ *
+ * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
+ * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
+ * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
+ * to retransmit -- hence the second queue.
+ */
+ private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
+ private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
+ private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
+
/**
* Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
* resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
private CompletionStage<? extends BackendInfo> backendProof;
private BackendInfo backend;
+ // This is not final because we need to be able to replace it.
+ private long txSequence;
+
+ private int lastTxLimit = DEFAULT_TX_LIMIT;
+
/**
* Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
*/
Preconditions.checkState(notClosed, "Queue %s is closed", this);
}
+ private long nextTxSequence() {
+ return txSequence++;
+ }
+
/**
* Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
* the following scenarios:
* 1) The request has been enqueued and transmitted. No further actions are necessary
* 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
- * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
+ * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
* process needs to complete before transmission occurs
*
* These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
* @param callback Callback to be invoked
* @return Optional duration with semantics described above.
*/
- @Nullable Optional<FiniteDuration> enqueueRequest(final long sequence, final Request<?, ?> request,
- final RequestCallback callback) {
+ @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
checkNotClosed();
final long now = ticker.read();
- final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now);
-
- queue.add(e);
- LOG.debug("Enqueued request {} to queue {}", request, this);
-
+ final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
if (backend == null) {
+ LOG.debug("No backend available, request resolution");
+ pending.add(e);
return Optional.empty();
}
+ if (!lastInflight.isEmpty()) {
+ LOG.debug("Retransmit not yet complete, delaying request {}", request);
+ pending.add(e);
+ return null;
+ }
+ if (currentInflight.size() >= lastTxLimit) {
+ LOG.debug("Queue is at capacity, delayed sending of request {}", request);
+ pending.add(e);
+ return null;
+ }
- e.retransmit(backend, now);
+ // Ready to transmit
+ currentInflight.offer(e);
+ LOG.debug("Enqueued request {} to queue {}", request, this);
+
+ e.retransmit(backend, nextTxSequence(), now);
if (expectingTimer == null) {
expectingTimer = now + REQUEST_TIMEOUT_NANOS;
return Optional.of(INITIAL_REQUEST_TIMEOUT);
}
}
- ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
- // Responses to different targets may arrive out of order, hence we use an iterator
+ /*
+ * We are using tri-state return here to indicate one of three conditions:
+ * - if a matching entry is found, return an Optional containing it
+ * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
+ * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
+ */
+ private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
+ final ResponseEnvelope<?> envelope) {
+ // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
+ // to use an iterator
final Iterator<SequencedQueueEntry> it = queue.iterator();
while (it.hasNext()) {
final SequencedQueueEntry e = it.next();
- if (e.acceptsResponse(response)) {
- lastProgress = ticker.read();
- it.remove();
- LOG.debug("Completing request {} with {}", e, response);
- return e.complete(response.getMessage());
+ final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
+
+ final Request<?, ?> request = e.getRequest();
+ final Response<?, ?> response = envelope.getMessage();
+
+ // First check for matching target, or move to next entry
+ if (!request.getTarget().equals(response.getTarget())) {
+ continue;
+ }
+
+ // Sanity-check logical sequence, ignore any out-of-order messages
+ if (request.getSequence() != response.getSequence()) {
+ LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
+ return Optional.empty();
+ }
+
+ // Now check session match
+ if (envelope.getSessionId() != txDetails.getSessionId()) {
+ LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
+ return Optional.empty();
+ }
+ if (envelope.getTxSequence() != txDetails.getTxSequence()) {
+ LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
+ return Optional.empty();
}
+
+ LOG.debug("Completing request {} with {}", request, envelope);
+ it.remove();
+ return Optional.of(e);
}
- LOG.debug("No request matching {} found", response);
- return current;
+ return null;
+ }
+
+ ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
+ Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
+ if (maybeEntry == null) {
+ maybeEntry = findMatchingEntry(lastInflight, envelope);
+ }
+
+ if (maybeEntry == null || !maybeEntry.isPresent()) {
+ LOG.warn("No request matching {} found, ignoring response", envelope);
+ return current;
+ }
+
+ lastProgress = ticker.read();
+ final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
+
+ // We have freed up a slot, try to transmit something
+ if (backend != null) {
+ final int toSend = lastTxLimit - currentInflight.size();
+ if (toSend > 0) {
+ runTransmit(toSend);
+ }
+ }
+
+ return ret;
+ }
+
+ private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
+ int toSend = count;
+
+ while (toSend > 0) {
+ final SequencedQueueEntry e = queue.poll();
+ if (e == null) {
+ break;
+ }
+
+ LOG.debug("Transmitting entry {}", e);
+ e.retransmit(backend, nextTxSequence(), lastProgress);
+ toSend--;
+ }
+
+ return toSend;
+ }
+
+ private void runTransmit(final int count) {
+ final int toSend;
+
+ // Process lastInflight first, possibly clearing it
+ if (!lastInflight.isEmpty()) {
+ toSend = transmitEntries(lastInflight, count);
+ if (lastInflight.isEmpty()) {
+ // We won't be needing the queue anymore, change it to specialized implementation
+ lastInflight = EmptyQueue.getInstance();
+ }
+ } else {
+ toSend = count;
+ }
+
+ // Process pending next.
+ transmitEntries(pending, toSend);
}
Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
+ Preconditions.checkNotNull(backend);
if (!proof.equals(backendProof)) {
LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
return Optional.empty();
}
- this.backend = Preconditions.checkNotNull(backend);
- backendProof = null;
LOG.debug("Resolved backend {}", backend);
- if (queue.isEmpty()) {
- // No pending requests, hence no need for a timer
- return Optional.empty();
+ // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
+ // and also not to exceed new limits
+ final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
+ newLast.addAll(currentInflight);
+ newLast.addAll(lastInflight);
+ lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
+
+ // Clear currentInflight, possibly compacting it
+ final int txLimit = backend.getMaxMessages();
+ if (lastTxLimit > txLimit) {
+ currentInflight = new ArrayDeque<>();
+ } else {
+ currentInflight.clear();
}
- LOG.debug("Resending requests to backend {}", backend);
- final long now = ticker.read();
- for (SequencedQueueEntry e : queue) {
- e.retransmit(backend, now);
- }
+ // We are ready to roll
+ this.backend = backend;
+ backendProof = null;
+ txSequence = 0;
+ lastTxLimit = txLimit;
+ lastProgress = ticker.read();
- if (expectingTimer != null) {
- // We already have a timer going, no need to schedule a new one
+ // No pending requests, return
+ if (lastInflight.isEmpty() && pending.isEmpty()) {
return Optional.empty();
}
- // Above loop may have cost us some time. Recalculate timeout.
- final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
- expectingTimer = nextTicks;
- return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
+ LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
+
+ runTransmit(lastTxLimit);
+
+ // Calculate next timer if necessary
+ if (expectingTimer == null) {
+ // Request transmission may have cost us some time. Recalculate timeout.
+ final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
+ expectingTimer = nextTicks;
+ return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
+ } else {
+ return Optional.empty();
+ }
}
boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
}
boolean hasCompleted() {
- return !notClosed && queue.isEmpty();
+ return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
}
/**
expectingTimer = null;
final long now = ticker.read();
- if (!queue.isEmpty()) {
+ if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
final long ticksSinceProgress = now - lastProgress;
if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
}
// We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
- final SequencedQueueEntry head = queue.peek();
+ final SequencedQueueEntry head = currentInflight.peek();
if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
backend = null;
LOG.debug("Queue {} invalidated backend info", this);
}
}
+ private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
+ queue.forEach(e -> e.poison(cause));
+ queue.clear();
+ }
+
void poison(final RequestException cause) {
close();
- SequencedQueueEntry e = queue.poll();
- while (e != null) {
- e.poison(cause);
- e = queue.poll();
- }
+ poisonQueue(currentInflight, cause);
+ poisonQueue(lastInflight, cause);
+ poisonQueue(pending, cause);
}
// FIXME: add a caller from ClientSingleTransaction