BUG-5280: add AbstractClientConnection
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / SequencedQueue.java
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java
deleted file mode 100644 (file)
index 596e353..0000000
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.access.client;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
-import com.google.common.base.Verify;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
-
-/*
- * A queue that processes entries in sequence.
- *
- * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
- *       retries and enqueues work as expected.
- */
-@NotThreadSafe
-final class SequencedQueue {
-    private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
-
-    // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
-    @VisibleForTesting
-    static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
-    @VisibleForTesting
-    static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
-    private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
-        TimeUnit.NANOSECONDS);
-
-    /**
-     * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
-     * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
-     * resolution.
-     */
-    private static final int DEFAULT_TX_LIMIT = 1000;
-
-    private final Ticker ticker;
-    private final Long cookie;
-
-    /*
-     * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
-     * with the limit changing between reconnects (which imply retransmission).
-     *
-     * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
-     * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
-     * and one for requests which have not been transmitted at all.
-     *
-     * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
-     * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
-     * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
-     * to retransmit -- hence the second queue.
-     */
-    private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
-    private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
-    private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
-
-    /**
-     * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
-     * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
-     * result.
-     */
-    private CompletionStage<? extends BackendInfo> backendProof;
-    private BackendInfo backend;
-
-    // This is not final because we need to be able to replace it.
-    private long txSequence;
-
-    private int lastTxLimit = DEFAULT_TX_LIMIT;
-
-    /**
-     * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
-     */
-    private Object expectingTimer;
-
-    private long lastProgress;
-
-    // Updated from application thread
-    private volatile boolean notClosed = true;
-
-    SequencedQueue(final Long cookie, final Ticker ticker) {
-        this.cookie = Preconditions.checkNotNull(cookie);
-        this.ticker = Preconditions.checkNotNull(ticker);
-        lastProgress = ticker.read();
-    }
-
-    Long getCookie() {
-        return cookie;
-    }
-
-    private void checkNotClosed() {
-        Preconditions.checkState(notClosed, "Queue %s is closed", this);
-    }
-
-    private long nextTxSequence() {
-        return txSequence++;
-    }
-
-    /**
-     * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
-     * the following scenarios:
-     * 1) The request has been enqueued and transmitted. No further actions are necessary
-     * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
-     * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
-     *    process needs to complete before transmission occurs
-     * <p/>
-     * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
-     * the scenarios above according to the following rules:
-     * - if is null, the first case applies
-     * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
-     *      resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
-     * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
-     *
-     * @param request Request to be sent
-     * @param callback Callback to be invoked
-     * @return Optional duration with semantics described above.
-     */
-    @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
-        checkNotClosed();
-
-        final long now = ticker.read();
-        final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
-        if (backend == null) {
-            LOG.debug("No backend available, request resolution");
-            pending.add(e);
-            return Optional.empty();
-        }
-        if (!lastInflight.isEmpty()) {
-            LOG.debug("Retransmit not yet complete, delaying request {}", request);
-            pending.add(e);
-            return null;
-        }
-        if (currentInflight.size() >= lastTxLimit) {
-            LOG.debug("Queue is at capacity, delayed sending of request {}", request);
-            pending.add(e);
-            return null;
-        }
-
-        // Ready to transmit
-
-        if (currentInflight.offer(e)) {
-            LOG.debug("Enqueued request {} to queue {}", request, this);
-        } else {
-            // This shouldn't happen since the queue has unlimited capacity but check anyway to avoid FindBugs warning
-            // about checking return value.
-            LOG.warn("Fail to enqueued request {} to queue {}", request, this);
-        }
-
-        e.retransmit(backend, nextTxSequence(), now);
-        if (expectingTimer == null) {
-            expectingTimer = now + REQUEST_TIMEOUT_NANOS;
-            return Optional.of(INITIAL_REQUEST_TIMEOUT);
-        } else {
-            return null;
-        }
-    }
-
-    /*
-     * We are using tri-state return here to indicate one of three conditions:
-     * - if a matching entry is found, return an Optional containing it
-     * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
-     * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
-     */
-    @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
-            justification = "Returning null Optional is documented in the API contract.")
-    private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
-            final ResponseEnvelope<?> envelope) {
-        // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
-        // to use an iterator
-        final Iterator<SequencedQueueEntry> it = queue.iterator();
-        while (it.hasNext()) {
-            final SequencedQueueEntry e = it.next();
-            final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
-
-            final Request<?, ?> request = e.getRequest();
-            final Response<?, ?> response = envelope.getMessage();
-
-            // First check for matching target, or move to next entry
-            if (!request.getTarget().equals(response.getTarget())) {
-                continue;
-            }
-
-            // Sanity-check logical sequence, ignore any out-of-order messages
-            if (request.getSequence() != response.getSequence()) {
-                LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
-                return Optional.empty();
-            }
-
-            // Now check session match
-            if (envelope.getSessionId() != txDetails.getSessionId()) {
-                LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
-                return Optional.empty();
-            }
-            if (envelope.getTxSequence() != txDetails.getTxSequence()) {
-                LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
-                return Optional.empty();
-            }
-
-            LOG.debug("Completing request {} with {}", request, envelope);
-            it.remove();
-            return Optional.of(e);
-        }
-
-        return null;
-    }
-
-    ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
-        Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
-        if (maybeEntry == null) {
-            maybeEntry = findMatchingEntry(lastInflight, envelope);
-        }
-
-        if (maybeEntry == null || !maybeEntry.isPresent()) {
-            LOG.warn("No request matching {} found, ignoring response", envelope);
-            return current;
-        }
-
-        lastProgress = ticker.read();
-        final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
-
-        // We have freed up a slot, try to transmit something
-        if (backend != null) {
-            final int toSend = lastTxLimit - currentInflight.size();
-            if (toSend > 0) {
-                runTransmit(toSend);
-            }
-        }
-
-        return ret;
-    }
-
-    private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
-        int toSend = count;
-
-        while (toSend > 0) {
-            final SequencedQueueEntry e = queue.poll();
-            if (e == null) {
-                break;
-            }
-
-            LOG.debug("Transmitting entry {}", e);
-            e.retransmit(backend, nextTxSequence(), lastProgress);
-            toSend--;
-        }
-
-        return toSend;
-    }
-
-    private void runTransmit(final int count) {
-        final int toSend;
-
-        // Process lastInflight first, possibly clearing it
-        if (!lastInflight.isEmpty()) {
-            toSend = transmitEntries(lastInflight, count);
-            if (lastInflight.isEmpty()) {
-                // We won't be needing the queue anymore, change it to specialized implementation
-                lastInflight = EmptyQueue.getInstance();
-            }
-        } else {
-            toSend = count;
-        }
-
-        // Process pending next.
-        transmitEntries(pending, toSend);
-    }
-
-    Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof,
-            final BackendInfo backend) {
-        Preconditions.checkNotNull(backend);
-        if (!proof.equals(backendProof)) {
-            LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
-            return Optional.empty();
-        }
-
-        LOG.debug("Resolved backend {}",  backend);
-
-        // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
-        // and also not to exceed new limits
-        final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
-        newLast.addAll(currentInflight);
-        newLast.addAll(lastInflight);
-        lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
-
-        // Clear currentInflight, possibly compacting it
-        final int txLimit = backend.getMaxMessages();
-        if (lastTxLimit > txLimit) {
-            currentInflight = new ArrayDeque<>();
-        } else {
-            currentInflight.clear();
-        }
-
-        // We are ready to roll
-        this.backend = backend;
-        backendProof = null;
-        txSequence = 0;
-        lastTxLimit = txLimit;
-        lastProgress = ticker.read();
-
-        // No pending requests, return
-        if (lastInflight.isEmpty() && pending.isEmpty()) {
-            return Optional.empty();
-        }
-
-        LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
-
-        runTransmit(lastTxLimit);
-
-        // Calculate next timer if necessary
-        if (expectingTimer == null) {
-            // Request transmission may have cost us some time. Recalculate timeout.
-            final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
-            expectingTimer = nextTicks;
-            return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
-        } else {
-            return Optional.empty();
-        }
-    }
-
-    boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
-        if (!proof.equals(backendProof)) {
-            LOG.debug("Setting resolution handle to {}", proof);
-            backendProof = proof;
-            return true;
-        } else {
-            LOG.trace("Already resolving handle {}", proof);
-            return false;
-        }
-    }
-
-    boolean hasCompleted() {
-        return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
-    }
-
-    /**
-     * Check queue timeouts and return true if a timeout has occured.
-     *
-     * @return True if a timeout occured
-     * @throws NoProgressException if the queue failed to make progress for an extended
-     *                             time.
-     */
-    boolean runTimeout() throws NoProgressException {
-        expectingTimer = null;
-        final long now = ticker.read();
-
-        if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
-            final long ticksSinceProgress = now - lastProgress;
-            if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
-                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
-                    TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
-
-                final NoProgressException ex = new NoProgressException(ticksSinceProgress);
-                poison(ex);
-                throw ex;
-            }
-        }
-
-        // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
-        final SequencedQueueEntry head = currentInflight.peek();
-        if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
-            backend = null;
-            LOG.debug("Queue {} invalidated backend info", this);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
-        queue.forEach(e -> e.poison(cause));
-        queue.clear();
-    }
-
-    void poison(final RequestException cause) {
-        close();
-
-        poisonQueue(currentInflight, cause);
-        poisonQueue(lastInflight, cause);
-        poisonQueue(pending, cause);
-    }
-
-    // FIXME: add a caller from ClientSingleTransaction
-    void close() {
-        notClosed = false;
-    }
-}