Add JournalSegmentFile.map()
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
index 6f283f549cbdbdf50c75d99397fd650a2b709897..cc3da1e4503118ed5a064ceeb60fd34c4fe9e12d 100644 (file)
@@ -7,23 +7,28 @@
  */
 package org.opendaylight.controller.cluster.access.client;
 
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
-import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,34 +53,48 @@ import org.slf4j.LoggerFactory;
  *
  * <p>
  * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
- *
- * @author Robert Varga
  */
-@NotThreadSafe
-abstract class TransmitQueue {
+abstract sealed class TransmitQueue {
     static final class Halted extends TransmitQueue {
+        // For ConnectingClientConnection.
         Halted(final int targetDepth) {
             super(targetDepth);
         }
 
+        // For ReconnectingClientConnection.
+        Halted(final TransmitQueue oldQueue, final long now) {
+            super(oldQueue, now);
+        }
+
         @Override
         int canTransmitCount(final int inflightSize) {
             return 0;
         }
 
         @Override
-        TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
+        Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
             throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
         }
+
+        @Override
+        void preComplete(final ResponseEnvelope<?> envelope) {
+        }
     }
 
     static final class Transmitting extends TransmitQueue {
+        private static final long NOT_SLICING = -1;
+
         private final BackendInfo backend;
+        private final MessageSlicer messageSlicer;
         private long nextTxSequence;
-
-        Transmitting(final int targetDepth, final BackendInfo backend) {
-            super(targetDepth);
-            this.backend = Preconditions.checkNotNull(backend);
+        private long currentSlicedEnvSequenceId = NOT_SLICING;
+
+        // For ConnectedClientConnection.
+        Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
+                final MessageSlicer messageSlicer) {
+            super(oldQueue, targetDepth, now);
+            this.backend = requireNonNull(backend);
+            this.messageSlicer = requireNonNull(messageSlicer);
         }
 
         @Override
@@ -84,14 +103,42 @@ abstract class TransmitQueue {
         }
 
         @Override
-        TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
-            final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
+        Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
+            // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
+            // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
+            // Optional to indicate the request was not transmitted.
+            if (currentSlicedEnvSequenceId >= 0) {
+                return Optional.empty();
+            }
+
+            final Request<?, ?> request = entry.getRequest();
+            final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
                 backend.getSessionId(), nextTxSequence++);
 
-            final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
-                env.getTxSequence(), now);
-            backend.getActor().tell(env, ActorRef.noSender());
-            return ret;
+            if (request instanceof SliceableMessage) {
+                if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
+                        .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
+                        .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
+                                "Failed to slice request " + request, t), 0L)).build())) {
+                    // The request was sliced so record the envelope sequence id to prevent transmitting
+                    // subsequent requests until slicing completes.
+                    currentSlicedEnvSequenceId = env.getTxSequence();
+                }
+            } else {
+                backend.getActor().tell(env, ActorRef.noSender());
+            }
+
+            return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
+                    env.getTxSequence(), now));
+        }
+
+        @Override
+        void preComplete(final ResponseEnvelope<?> envelope) {
+            if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
+                // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
+                // requests to be transmitted.
+                currentSlicedEnvSequenceId = NOT_SLICING;
+            }
         }
     }
 
@@ -99,13 +146,38 @@ abstract class TransmitQueue {
 
     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
-    private final ProgressTracker tracker;
+    // Cannot be just ProgressTracker as we are inheriting limits.
+    private final AveragingProgressTracker tracker;
     private ReconnectForwarder successor;
 
+    /**
+     * Construct initial transmitting queue.
+     */
     TransmitQueue(final int targetDepth) {
         tracker = new AveragingProgressTracker(targetDepth);
     }
 
+    /**
+     * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
+     */
+    TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
+        tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
+    }
+
+    /**
+     * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
+     */
+    TransmitQueue(final TransmitQueue oldQueue, final long now) {
+        tracker = new AveragingProgressTracker(oldQueue.tracker, now);
+    }
+
+    /**
+     * Cancel the accumulated sum of delays as we expect the new backend to work now.
+     */
+    void cancelDebt(final long now) {
+        tracker.cancelDebt(now);
+    }
+
     /**
      * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
      * to be added to it during replay. When we set the successor all entries enqueued between when this methods
@@ -132,6 +204,8 @@ abstract class TransmitQueue {
 
     // If a matching request was found, this will track a task was closed.
     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
+        preComplete(envelope);
+
         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
         if (maybeEntry == null) {
             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
@@ -143,7 +217,7 @@ abstract class TransmitQueue {
             return Optional.empty();
         }
 
-        final TransmittedConnectionEntry entry = maybeEntry.get();
+        final TransmittedConnectionEntry entry = maybeEntry.orElseThrow();
         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
 
         // We have freed up a slot, try to transmit something
@@ -162,37 +236,54 @@ abstract class TransmitQueue {
     private void transmitEntries(final int maxTransmit, final long now) {
         for (int i = 0; i < maxTransmit; ++i) {
             final ConnectionEntry e = pending.poll();
-            if (e == null) {
+            if (e == null || !transmitEntry(e, now)) {
                 LOG.debug("Queue {} transmitted {} requests", this, i);
                 return;
             }
-
-            transmitEntry(e, now);
         }
 
         LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
     }
 
-    private void transmitEntry(final ConnectionEntry entry, final long now) {
+    private boolean transmitEntry(final ConnectionEntry entry, final long now) {
         LOG.debug("Queue {} transmitting entry {}", this, entry);
         // We are not thread-safe and are supposed to be externally-guarded,
         // hence send-before-record should be fine.
         // This needs to be revisited if the external guards are lowered.
-        inflight.addLast(transmit(entry, now));
+        final Optional<TransmittedConnectionEntry> maybeTransmitted = transmit(entry, now);
+        if (!maybeTransmitted.isPresent()) {
+            return false;
+        }
+
+        inflight.addLast(maybeTransmitted.orElseThrow());
+        return true;
     }
 
-    /**
-     * Enqueue an entry, possibly also transmitting it.
-     *
-     * @return Delay to be forced on the calling thread, in nanoseconds.
-     */
-    final long enqueue(final ConnectionEntry entry, final long now) {
+    final long enqueueOrForward(final ConnectionEntry entry, final long now) {
         if (successor != null) {
             // This call will pay the enqueuing price, hence the caller does not have to
             successor.forwardEntry(entry, now);
             return 0;
         }
 
+        return enqueue(entry, now);
+    }
+
+    final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
+        if (successor != null) {
+            successor.replayEntry(entry, now);
+        } else {
+            enqueue(entry, now);
+        }
+    }
+
+    /**
+     * Enqueue an entry, possibly also transmitting it.
+     *
+     * @return Delay to be forced on the calling thread, in nanoseconds.
+     */
+    private long enqueue(final ConnectionEntry entry, final long now) {
+
         // XXX: we should place a guard against incorrect entry sequences:
         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
 
@@ -211,7 +302,11 @@ abstract class TransmitQueue {
         }
 
         if (pending.isEmpty()) {
-            transmitEntry(entry, now);
+            if (!transmitEntry(entry, now)) {
+                LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
+                pending.addLast(entry);
+            }
+
             return delay;
         }
 
@@ -225,7 +320,9 @@ abstract class TransmitQueue {
      */
     abstract int canTransmitCount(int inflightSize);
 
-    abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
+    abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now);
+
+    abstract void preComplete(ResponseEnvelope<?> envelope);
 
     final boolean isEmpty() {
         return inflight.isEmpty() && pending.isEmpty();
@@ -240,14 +337,18 @@ abstract class TransmitQueue {
         return pending.peek();
     }
 
-    final void poison(final RequestException cause) {
-        poisonQueue(inflight, cause);
-        poisonQueue(pending, cause);
+    final List<ConnectionEntry> poison() {
+        final List<ConnectionEntry> entries = new ArrayList<>(inflight.size() + pending.size());
+        entries.addAll(inflight);
+        inflight.clear();
+        entries.addAll(pending);
+        pending.clear();
+        return entries;
     }
 
     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
-        Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this);
-        successor = Preconditions.checkNotNull(forwarder);
+        verify(successor == null, "Successor %s already set on connection %s", successor, this);
+        successor = requireNonNull(forwarder);
         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
 
         /*
@@ -323,12 +424,10 @@ abstract class TransmitQueue {
             }
 
             // Check if the entry has (ever) been transmitted
-            if (!(e instanceof TransmittedConnectionEntry)) {
+            if (!(e instanceof TransmittedConnectionEntry te)) {
                 return Optional.empty();
             }
 
-            final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
-
             // Now check session match
             if (envelope.getSessionId() != te.getSessionId()) {
                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
@@ -346,13 +445,4 @@ abstract class TransmitQueue {
 
         return null;
     }
-
-    private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
-        for (ConnectionEntry e : queue) {
-            final Request<?, ?> request = e.getRequest();
-            LOG.trace("Poisoning request {}", request, cause);
-            e.complete(request.toRequestFailure(cause));
-        }
-        queue.clear();
-    }
 }