X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FSequencedQueue.java;h=596e353c981f3d21d193c66795753233c40bb6eb;hp=513a3f936b9ddabac4131d519f82ccf3461e89d3;hb=5fd8e6506248cc34da72281a1662612f6c2b2f9a;hpb=b0067e0a4bfa955f15c6259e019f954687264eff diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java index 513a3f936b..596e353c98 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java @@ -10,22 +10,27 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; -import java.util.Deque; +import com.google.common.base.Verify; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayDeque; import java.util.Iterator; -import java.util.LinkedList; import java.util.Optional; +import java.util.Queue; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; /* + * A queue that processes entries in sequence. + * * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts, * retries and enqueues work as expected. */ @@ -42,18 +47,32 @@ final class SequencedQueue { TimeUnit.NANOSECONDS); /** - * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can - * progress at different speeds, these may be completed out of order. - * - * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to - * figure out a more efficient lookup mechanism to deal with responses which do not match the queue - * order. + * Default number of permits we start with. This value is used when we start up only, once we resolve a backend + * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful + * resolution. */ - private final Deque queue = new LinkedList<>(); + private static final int DEFAULT_TX_LIMIT = 1000; + private final Ticker ticker; private final Long cookie; - // Updated/consulted from actor context only + /* + * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing + * with the limit changing between reconnects (which imply retransmission). + * + * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one), + * one for requests that have been sent to the previous backend (and have not been transmitted to the current one), + * and one for requests which have not been transmitted at all. + * + * When transmitting we first try to drain the second queue and service the third one only when that becomes empty. + * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses + * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance + * to retransmit -- hence the second queue. + */ + private Queue currentInflight = new ArrayDeque<>(); + private Queue lastInflight = new ArrayDeque<>(); + private final Queue pending = new ArrayDeque<>(); + /** * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested @@ -62,6 +81,11 @@ final class SequencedQueue { private CompletionStage backendProof; private BackendInfo backend; + // This is not final because we need to be able to replace it. + private long txSequence; + + private int lastTxLimit = DEFAULT_TX_LIMIT; + /** * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue. */ @@ -86,14 +110,18 @@ final class SequencedQueue { Preconditions.checkState(notClosed, "Queue %s is closed", this); } + private long nextTxSequence() { + return txSequence++; + } + /** * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller * the following scenarios: * 1) The request has been enqueued and transmitted. No further actions are necessary * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer - * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that + * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that * process needs to complete before transmission occurs - * + *

* These options are covered via returning an {@link Optional}. The caller needs to examine it and decode * the scenarios above according to the following rules: * - if is null, the first case applies @@ -105,21 +133,38 @@ final class SequencedQueue { * @param callback Callback to be invoked * @return Optional duration with semantics described above. */ - @Nullable Optional enqueueRequest(final long sequence, final Request request, - final RequestCallback callback) { + @Nullable Optional enqueueRequest(final Request request, final RequestCallback callback) { checkNotClosed(); final long now = ticker.read(); - final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now); - - queue.add(e); - LOG.debug("Enqueued request {} to queue {}", request, this); - + final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now); if (backend == null) { + LOG.debug("No backend available, request resolution"); + pending.add(e); return Optional.empty(); } + if (!lastInflight.isEmpty()) { + LOG.debug("Retransmit not yet complete, delaying request {}", request); + pending.add(e); + return null; + } + if (currentInflight.size() >= lastTxLimit) { + LOG.debug("Queue is at capacity, delayed sending of request {}", request); + pending.add(e); + return null; + } - e.retransmit(backend, now); + // Ready to transmit + + if (currentInflight.offer(e)) { + LOG.debug("Enqueued request {} to queue {}", request, this); + } else { + // This shouldn't happen since the queue has unlimited capacity but check anyway to avoid FindBugs warning + // about checking return value. + LOG.warn("Fail to enqueued request {} to queue {}", request, this); + } + + e.retransmit(backend, nextTxSequence(), now); if (expectingTimer == null) { expectingTimer = now + REQUEST_TIMEOUT_NANOS; return Optional.of(INITIAL_REQUEST_TIMEOUT); @@ -128,53 +173,165 @@ final class SequencedQueue { } } - ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope response) { - // Responses to different targets may arrive out of order, hence we use an iterator + /* + * We are using tri-state return here to indicate one of three conditions: + * - if a matching entry is found, return an Optional containing it + * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null + * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional + */ + @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", + justification = "Returning null Optional is documented in the API contract.") + private static Optional findMatchingEntry(final Queue queue, + final ResponseEnvelope envelope) { + // Try to find the request in a queue. Responses may legally come back in a different order, hence we need + // to use an iterator final Iterator it = queue.iterator(); while (it.hasNext()) { final SequencedQueueEntry e = it.next(); - if (e.acceptsResponse(response)) { - lastProgress = ticker.read(); - it.remove(); - LOG.debug("Completing request {} with {}", e, response); - return e.complete(response.getMessage()); + final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails()); + + final Request request = e.getRequest(); + final Response response = envelope.getMessage(); + + // First check for matching target, or move to next entry + if (!request.getTarget().equals(response.getTarget())) { + continue; + } + + // Sanity-check logical sequence, ignore any out-of-order messages + if (request.getSequence() != response.getSequence()) { + LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope); + return Optional.empty(); + } + + // Now check session match + if (envelope.getSessionId() != txDetails.getSessionId()) { + LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope); + return Optional.empty(); + } + if (envelope.getTxSequence() != txDetails.getTxSequence()) { + LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope); + return Optional.empty(); + } + + LOG.debug("Completing request {} with {}", request, envelope); + it.remove(); + return Optional.of(e); + } + + return null; + } + + ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope envelope) { + Optional maybeEntry = findMatchingEntry(currentInflight, envelope); + if (maybeEntry == null) { + maybeEntry = findMatchingEntry(lastInflight, envelope); + } + + if (maybeEntry == null || !maybeEntry.isPresent()) { + LOG.warn("No request matching {} found, ignoring response", envelope); + return current; + } + + lastProgress = ticker.read(); + final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage()); + + // We have freed up a slot, try to transmit something + if (backend != null) { + final int toSend = lastTxLimit - currentInflight.size(); + if (toSend > 0) { + runTransmit(toSend); + } + } + + return ret; + } + + private int transmitEntries(final Queue queue, final int count) { + int toSend = count; + + while (toSend > 0) { + final SequencedQueueEntry e = queue.poll(); + if (e == null) { + break; + } + + LOG.debug("Transmitting entry {}", e); + e.retransmit(backend, nextTxSequence(), lastProgress); + toSend--; + } + + return toSend; + } + + private void runTransmit(final int count) { + final int toSend; + + // Process lastInflight first, possibly clearing it + if (!lastInflight.isEmpty()) { + toSend = transmitEntries(lastInflight, count); + if (lastInflight.isEmpty()) { + // We won't be needing the queue anymore, change it to specialized implementation + lastInflight = EmptyQueue.getInstance(); } + } else { + toSend = count; } - LOG.debug("No request matching {} found", response); - return current; + // Process pending next. + transmitEntries(pending, toSend); } - Optional setBackendInfo(final CompletionStage proof, final BackendInfo backend) { + Optional setBackendInfo(final CompletionStage proof, + final BackendInfo backend) { + Preconditions.checkNotNull(backend); if (!proof.equals(backendProof)) { LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof); return Optional.empty(); } - this.backend = Preconditions.checkNotNull(backend); - backendProof = null; LOG.debug("Resolved backend {}", backend); - if (queue.isEmpty()) { - // No pending requests, hence no need for a timer - return Optional.empty(); + // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right + // and also not to exceed new limits + final Queue newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size()); + newLast.addAll(currentInflight); + newLast.addAll(lastInflight); + lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast; + + // Clear currentInflight, possibly compacting it + final int txLimit = backend.getMaxMessages(); + if (lastTxLimit > txLimit) { + currentInflight = new ArrayDeque<>(); + } else { + currentInflight.clear(); } - LOG.debug("Resending requests to backend {}", backend); - final long now = ticker.read(); - for (SequencedQueueEntry e : queue) { - e.retransmit(backend, now); - } + // We are ready to roll + this.backend = backend; + backendProof = null; + txSequence = 0; + lastTxLimit = txLimit; + lastProgress = ticker.read(); - if (expectingTimer != null) { - // We already have a timer going, no need to schedule a new one + // No pending requests, return + if (lastInflight.isEmpty() && pending.isEmpty()) { return Optional.empty(); } - // Above loop may have cost us some time. Recalculate timeout. - final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS; - expectingTimer = nextTicks; - return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS)); + LOG.debug("Sending up to {} requests to backend {}", txLimit, backend); + + runTransmit(lastTxLimit); + + // Calculate next timer if necessary + if (expectingTimer == null) { + // Request transmission may have cost us some time. Recalculate timeout. + final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS; + expectingTimer = nextTicks; + return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS)); + } else { + return Optional.empty(); + } } boolean expectProof(final CompletionStage proof) { @@ -189,7 +346,7 @@ final class SequencedQueue { } boolean hasCompleted() { - return !notClosed && queue.isEmpty(); + return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty(); } /** @@ -203,7 +360,7 @@ final class SequencedQueue { expectingTimer = null; final long now = ticker.read(); - if (!queue.isEmpty()) { + if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) { final long ticksSinceProgress = now - lastProgress; if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, @@ -216,7 +373,7 @@ final class SequencedQueue { } // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue - final SequencedQueueEntry head = queue.peek(); + final SequencedQueueEntry head = currentInflight.peek(); if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) { backend = null; LOG.debug("Queue {} invalidated backend info", this); @@ -226,14 +383,17 @@ final class SequencedQueue { } } + private static void poisonQueue(final Queue queue, final RequestException cause) { + queue.forEach(e -> e.poison(cause)); + queue.clear(); + } + void poison(final RequestException cause) { close(); - SequencedQueueEntry e = queue.poll(); - while (e != null) { - e.poison(cause); - e = queue.poll(); - } + poisonQueue(currentInflight, cause); + poisonQueue(lastInflight, cause); + poisonQueue(pending, cause); } // FIXME: add a caller from ClientSingleTransaction