BUG-5280: make EmptyQueue public
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / SequencedQueue.java
index 513a3f936b9ddabac4131d519f82ccf3461e89d3..5cf7873a4b95efadc64ba335dc3004aea9333bb4 100644 (file)
@@ -10,16 +10,18 @@ package org.opendaylight.controller.cluster.access.client;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
-import java.util.Deque;
+import com.google.common.base.Verify;
+import java.util.ArrayDeque;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,18 +44,32 @@ final class SequencedQueue {
         TimeUnit.NANOSECONDS);
 
     /**
-     * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
-     * progress at different speeds, these may be completed out of order.
-     *
-     * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
-     *       figure out a more efficient lookup mechanism to deal with responses which do not match the queue
-     *       order.
+     * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
+     * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
+     * resolution.
      */
-    private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
+    private static final int DEFAULT_TX_LIMIT = 1000;
+
     private final Ticker ticker;
     private final Long cookie;
 
-    // Updated/consulted from actor context only
+    /*
+     * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
+     * with the limit changing between reconnects (which imply retransmission).
+     *
+     * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
+     * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
+     * and one for requests which have not been transmitted at all.
+     *
+     * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
+     * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
+     * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
+     * to retransmit -- hence the second queue.
+     */
+    private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
+    private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
+    private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
+
     /**
      * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
      * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
@@ -62,6 +78,11 @@ final class SequencedQueue {
     private CompletionStage<? extends BackendInfo> backendProof;
     private BackendInfo backend;
 
+    // This is not final because we need to be able to replace it.
+    private long txSequence;
+
+    private int lastTxLimit = DEFAULT_TX_LIMIT;
+
     /**
      * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
      */
@@ -86,12 +107,16 @@ final class SequencedQueue {
         Preconditions.checkState(notClosed, "Queue %s is closed", this);
     }
 
+    private long nextTxSequence() {
+        return txSequence++;
+    }
+
     /**
      * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
      * the following scenarios:
      * 1) The request has been enqueued and transmitted. No further actions are necessary
      * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
-     * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
+     * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
      *    process needs to complete before transmission occurs
      *
      * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
@@ -105,21 +130,32 @@ final class SequencedQueue {
      * @param callback Callback to be invoked
      * @return Optional duration with semantics described above.
      */
-    @Nullable Optional<FiniteDuration> enqueueRequest(final long sequence, final Request<?, ?> request,
-            final RequestCallback callback) {
+    @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
         checkNotClosed();
 
         final long now = ticker.read();
-        final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now);
-
-        queue.add(e);
-        LOG.debug("Enqueued request {} to queue {}", request, this);
-
+        final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
         if (backend == null) {
+            LOG.debug("No backend available, request resolution");
+            pending.add(e);
             return Optional.empty();
         }
+        if (!lastInflight.isEmpty()) {
+            LOG.debug("Retransmit not yet complete, delaying request {}", request);
+            pending.add(e);
+            return null;
+        }
+        if (currentInflight.size() >= lastTxLimit) {
+            LOG.debug("Queue is at capacity, delayed sending of request {}", request);
+            pending.add(e);
+            return null;
+        }
 
-        e.retransmit(backend, now);
+        // Ready to transmit
+        currentInflight.offer(e);
+        LOG.debug("Enqueued request {} to queue {}", request, this);
+
+        e.retransmit(backend, nextTxSequence(), now);
         if (expectingTimer == null) {
             expectingTimer = now + REQUEST_TIMEOUT_NANOS;
             return Optional.of(INITIAL_REQUEST_TIMEOUT);
@@ -128,53 +164,162 @@ final class SequencedQueue {
         }
     }
 
-    ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
-        // Responses to different targets may arrive out of order, hence we use an iterator
+    /*
+     * We are using tri-state return here to indicate one of three conditions:
+     * - if a matching entry is found, return an Optional containing it
+     * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
+     * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
+     */
+    private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
+            final ResponseEnvelope<?> envelope) {
+        // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
+        // to use an iterator
         final Iterator<SequencedQueueEntry> it = queue.iterator();
         while (it.hasNext()) {
             final SequencedQueueEntry e = it.next();
-            if (e.acceptsResponse(response)) {
-                lastProgress = ticker.read();
-                it.remove();
-                LOG.debug("Completing request {} with {}", e, response);
-                return e.complete(response.getMessage());
+            final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
+
+            final Request<?, ?> request = e.getRequest();
+            final Response<?, ?> response = envelope.getMessage();
+
+            // First check for matching target, or move to next entry
+            if (!request.getTarget().equals(response.getTarget())) {
+                continue;
+            }
+
+            // Sanity-check logical sequence, ignore any out-of-order messages
+            if (request.getSequence() != response.getSequence()) {
+                LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
+                return Optional.empty();
+            }
+
+            // Now check session match
+            if (envelope.getSessionId() != txDetails.getSessionId()) {
+                LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
+                return Optional.empty();
+            }
+            if (envelope.getTxSequence() != txDetails.getTxSequence()) {
+                LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
+                return Optional.empty();
             }
+
+            LOG.debug("Completing request {} with {}", request, envelope);
+            it.remove();
+            return Optional.of(e);
         }
 
-        LOG.debug("No request matching {} found", response);
-        return current;
+        return null;
+    }
+
+    ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
+        Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
+        if (maybeEntry == null) {
+            maybeEntry = findMatchingEntry(lastInflight, envelope);
+        }
+
+        if (maybeEntry == null || !maybeEntry.isPresent()) {
+            LOG.warn("No request matching {} found, ignoring response", envelope);
+            return current;
+        }
+
+        lastProgress = ticker.read();
+        final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
+
+        // We have freed up a slot, try to transmit something
+        if (backend != null) {
+            final int toSend = lastTxLimit - currentInflight.size();
+            if (toSend > 0) {
+                runTransmit(toSend);
+            }
+        }
+
+        return ret;
+    }
+
+    private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
+        int toSend = count;
+
+        while (toSend > 0) {
+            final SequencedQueueEntry e = queue.poll();
+            if (e == null) {
+                break;
+            }
+
+            LOG.debug("Transmitting entry {}", e);
+            e.retransmit(backend, nextTxSequence(), lastProgress);
+            toSend--;
+        }
+
+        return toSend;
+    }
+
+    private void runTransmit(final int count) {
+        final int toSend;
+
+        // Process lastInflight first, possibly clearing it
+        if (!lastInflight.isEmpty()) {
+            toSend = transmitEntries(lastInflight, count);
+            if (lastInflight.isEmpty()) {
+                // We won't be needing the queue anymore, change it to specialized implementation
+                lastInflight = EmptyQueue.getInstance();
+            }
+        } else {
+            toSend = count;
+        }
+
+        // Process pending next.
+        transmitEntries(pending, toSend);
     }
 
     Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
+        Preconditions.checkNotNull(backend);
         if (!proof.equals(backendProof)) {
             LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
             return Optional.empty();
         }
 
-        this.backend = Preconditions.checkNotNull(backend);
-        backendProof = null;
         LOG.debug("Resolved backend {}",  backend);
 
-        if (queue.isEmpty()) {
-            // No pending requests, hence no need for a timer
-            return Optional.empty();
+        // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
+        // and also not to exceed new limits
+        final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
+        newLast.addAll(currentInflight);
+        newLast.addAll(lastInflight);
+        lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
+
+        // Clear currentInflight, possibly compacting it
+        final int txLimit = backend.getMaxMessages();
+        if (lastTxLimit > txLimit) {
+            currentInflight = new ArrayDeque<>();
+        } else {
+            currentInflight.clear();
         }
 
-        LOG.debug("Resending requests to backend {}", backend);
-        final long now = ticker.read();
-        for (SequencedQueueEntry e : queue) {
-            e.retransmit(backend, now);
-        }
+        // We are ready to roll
+        this.backend = backend;
+        backendProof = null;
+        txSequence = 0;
+        lastTxLimit = txLimit;
+        lastProgress = ticker.read();
 
-        if (expectingTimer != null) {
-            // We already have a timer going, no need to schedule a new one
+        // No pending requests, return
+        if (lastInflight.isEmpty() && pending.isEmpty()) {
             return Optional.empty();
         }
 
-        // Above loop may have cost us some time. Recalculate timeout.
-        final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
-        expectingTimer = nextTicks;
-        return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
+        LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
+
+        runTransmit(lastTxLimit);
+
+        // Calculate next timer if necessary
+        if (expectingTimer == null) {
+            // Request transmission may have cost us some time. Recalculate timeout.
+            final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
+            expectingTimer = nextTicks;
+            return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
+        } else {
+            return Optional.empty();
+        }
     }
 
     boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
@@ -189,7 +334,7 @@ final class SequencedQueue {
     }
 
     boolean hasCompleted() {
-        return !notClosed && queue.isEmpty();
+        return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
     }
 
     /**
@@ -203,7 +348,7 @@ final class SequencedQueue {
         expectingTimer = null;
         final long now = ticker.read();
 
-        if (!queue.isEmpty()) {
+        if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
             final long ticksSinceProgress = now - lastProgress;
             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
@@ -216,7 +361,7 @@ final class SequencedQueue {
         }
 
         // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
-        final SequencedQueueEntry head = queue.peek();
+        final SequencedQueueEntry head = currentInflight.peek();
         if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
             backend = null;
             LOG.debug("Queue {} invalidated backend info", this);
@@ -226,14 +371,17 @@ final class SequencedQueue {
         }
     }
 
+    private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
+        queue.forEach(e -> e.poison(cause));
+        queue.clear();
+    }
+
     void poison(final RequestException cause) {
         close();
 
-        SequencedQueueEntry e = queue.poll();
-        while (e != null) {
-            e.poison(cause);
-            e = queue.poll();
-        }
+        poisonQueue(currentInflight, cause);
+        poisonQueue(lastInflight, cause);
+        poisonQueue(pending, cause);
     }
 
     // FIXME: add a caller from ClientSingleTransaction