X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FSequencedQueue.java;fp=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FSequencedQueue.java;h=0000000000000000000000000000000000000000;hp=596e353c981f3d21d193c66795753233c40bb6eb;hb=320a4e5cd2d9d80468a3f82798744f2035488218;hpb=5fd8e6506248cc34da72281a1662612f6c2b2f9a diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java deleted file mode 100644 index 596e353c98..0000000000 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.access.client; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; -import com.google.common.base.Verify; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayDeque; -import java.util.Iterator; -import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.cluster.access.concepts.Request; -import org.opendaylight.controller.cluster.access.concepts.RequestException; -import org.opendaylight.controller.cluster.access.concepts.Response; -import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; - -/* - * A queue that processes entries in sequence. - * - * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts, - * retries and enqueues work as expected. - */ -@NotThreadSafe -final class SequencedQueue { - private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class); - - // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path - @VisibleForTesting - static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15); - @VisibleForTesting - static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30); - private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS, - TimeUnit.NANOSECONDS); - - /** - * Default number of permits we start with. This value is used when we start up only, once we resolve a backend - * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful - * resolution. - */ - private static final int DEFAULT_TX_LIMIT = 1000; - - private final Ticker ticker; - private final Long cookie; - - /* - * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing - * with the limit changing between reconnects (which imply retransmission). - * - * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one), - * one for requests that have been sent to the previous backend (and have not been transmitted to the current one), - * and one for requests which have not been transmitted at all. - * - * When transmitting we first try to drain the second queue and service the third one only when that becomes empty. - * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses - * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance - * to retransmit -- hence the second queue. - */ - private Queue currentInflight = new ArrayDeque<>(); - private Queue lastInflight = new ArrayDeque<>(); - private final Queue pending = new ArrayDeque<>(); - - /** - * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when - * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested - * result. - */ - private CompletionStage backendProof; - private BackendInfo backend; - - // This is not final because we need to be able to replace it. - private long txSequence; - - private int lastTxLimit = DEFAULT_TX_LIMIT; - - /** - * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue. - */ - private Object expectingTimer; - - private long lastProgress; - - // Updated from application thread - private volatile boolean notClosed = true; - - SequencedQueue(final Long cookie, final Ticker ticker) { - this.cookie = Preconditions.checkNotNull(cookie); - this.ticker = Preconditions.checkNotNull(ticker); - lastProgress = ticker.read(); - } - - Long getCookie() { - return cookie; - } - - private void checkNotClosed() { - Preconditions.checkState(notClosed, "Queue %s is closed", this); - } - - private long nextTxSequence() { - return txSequence++; - } - - /** - * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller - * the following scenarios: - * 1) The request has been enqueued and transmitted. No further actions are necessary - * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer - * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that - * process needs to complete before transmission occurs - *

- * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode - * the scenarios above according to the following rules: - * - if is null, the first case applies - * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend - * resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)} - * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer - * - * @param request Request to be sent - * @param callback Callback to be invoked - * @return Optional duration with semantics described above. - */ - @Nullable Optional enqueueRequest(final Request request, final RequestCallback callback) { - checkNotClosed(); - - final long now = ticker.read(); - final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now); - if (backend == null) { - LOG.debug("No backend available, request resolution"); - pending.add(e); - return Optional.empty(); - } - if (!lastInflight.isEmpty()) { - LOG.debug("Retransmit not yet complete, delaying request {}", request); - pending.add(e); - return null; - } - if (currentInflight.size() >= lastTxLimit) { - LOG.debug("Queue is at capacity, delayed sending of request {}", request); - pending.add(e); - return null; - } - - // Ready to transmit - - if (currentInflight.offer(e)) { - LOG.debug("Enqueued request {} to queue {}", request, this); - } else { - // This shouldn't happen since the queue has unlimited capacity but check anyway to avoid FindBugs warning - // about checking return value. - LOG.warn("Fail to enqueued request {} to queue {}", request, this); - } - - e.retransmit(backend, nextTxSequence(), now); - if (expectingTimer == null) { - expectingTimer = now + REQUEST_TIMEOUT_NANOS; - return Optional.of(INITIAL_REQUEST_TIMEOUT); - } else { - return null; - } - } - - /* - * We are using tri-state return here to indicate one of three conditions: - * - if a matching entry is found, return an Optional containing it - * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null - * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional - */ - @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", - justification = "Returning null Optional is documented in the API contract.") - private static Optional findMatchingEntry(final Queue queue, - final ResponseEnvelope envelope) { - // Try to find the request in a queue. Responses may legally come back in a different order, hence we need - // to use an iterator - final Iterator it = queue.iterator(); - while (it.hasNext()) { - final SequencedQueueEntry e = it.next(); - final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails()); - - final Request request = e.getRequest(); - final Response response = envelope.getMessage(); - - // First check for matching target, or move to next entry - if (!request.getTarget().equals(response.getTarget())) { - continue; - } - - // Sanity-check logical sequence, ignore any out-of-order messages - if (request.getSequence() != response.getSequence()) { - LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope); - return Optional.empty(); - } - - // Now check session match - if (envelope.getSessionId() != txDetails.getSessionId()) { - LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope); - return Optional.empty(); - } - if (envelope.getTxSequence() != txDetails.getTxSequence()) { - LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope); - return Optional.empty(); - } - - LOG.debug("Completing request {} with {}", request, envelope); - it.remove(); - return Optional.of(e); - } - - return null; - } - - ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope envelope) { - Optional maybeEntry = findMatchingEntry(currentInflight, envelope); - if (maybeEntry == null) { - maybeEntry = findMatchingEntry(lastInflight, envelope); - } - - if (maybeEntry == null || !maybeEntry.isPresent()) { - LOG.warn("No request matching {} found, ignoring response", envelope); - return current; - } - - lastProgress = ticker.read(); - final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage()); - - // We have freed up a slot, try to transmit something - if (backend != null) { - final int toSend = lastTxLimit - currentInflight.size(); - if (toSend > 0) { - runTransmit(toSend); - } - } - - return ret; - } - - private int transmitEntries(final Queue queue, final int count) { - int toSend = count; - - while (toSend > 0) { - final SequencedQueueEntry e = queue.poll(); - if (e == null) { - break; - } - - LOG.debug("Transmitting entry {}", e); - e.retransmit(backend, nextTxSequence(), lastProgress); - toSend--; - } - - return toSend; - } - - private void runTransmit(final int count) { - final int toSend; - - // Process lastInflight first, possibly clearing it - if (!lastInflight.isEmpty()) { - toSend = transmitEntries(lastInflight, count); - if (lastInflight.isEmpty()) { - // We won't be needing the queue anymore, change it to specialized implementation - lastInflight = EmptyQueue.getInstance(); - } - } else { - toSend = count; - } - - // Process pending next. - transmitEntries(pending, toSend); - } - - Optional setBackendInfo(final CompletionStage proof, - final BackendInfo backend) { - Preconditions.checkNotNull(backend); - if (!proof.equals(backendProof)) { - LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof); - return Optional.empty(); - } - - LOG.debug("Resolved backend {}", backend); - - // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right - // and also not to exceed new limits - final Queue newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size()); - newLast.addAll(currentInflight); - newLast.addAll(lastInflight); - lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast; - - // Clear currentInflight, possibly compacting it - final int txLimit = backend.getMaxMessages(); - if (lastTxLimit > txLimit) { - currentInflight = new ArrayDeque<>(); - } else { - currentInflight.clear(); - } - - // We are ready to roll - this.backend = backend; - backendProof = null; - txSequence = 0; - lastTxLimit = txLimit; - lastProgress = ticker.read(); - - // No pending requests, return - if (lastInflight.isEmpty() && pending.isEmpty()) { - return Optional.empty(); - } - - LOG.debug("Sending up to {} requests to backend {}", txLimit, backend); - - runTransmit(lastTxLimit); - - // Calculate next timer if necessary - if (expectingTimer == null) { - // Request transmission may have cost us some time. Recalculate timeout. - final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS; - expectingTimer = nextTicks; - return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS)); - } else { - return Optional.empty(); - } - } - - boolean expectProof(final CompletionStage proof) { - if (!proof.equals(backendProof)) { - LOG.debug("Setting resolution handle to {}", proof); - backendProof = proof; - return true; - } else { - LOG.trace("Already resolving handle {}", proof); - return false; - } - } - - boolean hasCompleted() { - return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty(); - } - - /** - * Check queue timeouts and return true if a timeout has occured. - * - * @return True if a timeout occured - * @throws NoProgressException if the queue failed to make progress for an extended - * time. - */ - boolean runTimeout() throws NoProgressException { - expectingTimer = null; - final long now = ticker.read(); - - if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) { - final long ticksSinceProgress = now - lastProgress; - if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { - LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, - TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress)); - - final NoProgressException ex = new NoProgressException(ticksSinceProgress); - poison(ex); - throw ex; - } - } - - // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue - final SequencedQueueEntry head = currentInflight.peek(); - if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) { - backend = null; - LOG.debug("Queue {} invalidated backend info", this); - return true; - } else { - return false; - } - } - - private static void poisonQueue(final Queue queue, final RequestException cause) { - queue.forEach(e -> e.poison(cause)); - queue.clear(); - } - - void poison(final RequestException cause) { - close(); - - poisonQueue(currentInflight, cause); - poisonQueue(lastInflight, cause); - poisonQueue(pending, cause); - } - - // FIXME: add a caller from ClientSingleTransaction - void close() { - notClosed = false; - } -}