Bug 8619: Introduce inheritance of progress trackers
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
index 15ad958304f9b61cd1a20b322b12c7354542759e..24264eeedff77bca598e58d4d3f4f4a6ac996c34 100644 (file)
@@ -11,9 +11,9 @@ import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
-import com.google.common.collect.Iterables;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.Optional;
@@ -54,10 +54,16 @@ import org.slf4j.LoggerFactory;
 @NotThreadSafe
 abstract class TransmitQueue {
     static final class Halted extends TransmitQueue {
+        // For ConnectingClientConnection.
         Halted(final int targetDepth) {
             super(targetDepth);
         }
 
+        // For ReconnectingClientConnection.
+        Halted(final TransmitQueue oldQueue, final long now) {
+            super(oldQueue, now);
+        }
+
         @Override
         int canTransmitCount(final int inflightSize) {
             return 0;
@@ -73,8 +79,9 @@ abstract class TransmitQueue {
         private final BackendInfo backend;
         private long nextTxSequence;
 
-        Transmitting(final int targetDepth, final BackendInfo backend) {
-            super(targetDepth);
+        // For ConnectedClientConnection.
+        Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) {
+            super(oldQueue, targetDepth, now);
             this.backend = Preconditions.checkNotNull(backend);
         }
 
@@ -99,15 +106,51 @@ abstract class TransmitQueue {
 
     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
-    private final ProgressTracker tracker;
+    private final AveragingProgressTracker tracker;  // Cannot be just ProgressTracker as we are inheriting limits.
     private ReconnectForwarder successor;
 
+    /**
+     * Construct initial transmitting queue.
+     */
     TransmitQueue(final int targetDepth) {
         tracker = new AveragingProgressTracker(targetDepth);
     }
 
-    final Iterable<ConnectionEntry> asIterable() {
-        return Iterables.concat(inflight, pending);
+    /**
+     * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
+     */
+    TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
+        tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
+    }
+
+    /**
+     * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
+     */
+    TransmitQueue(final TransmitQueue oldQueue, final long now) {
+        tracker = new AveragingProgressTracker(oldQueue.tracker, now);
+    }
+
+    /**
+     * Cancel the accumulated sum of delays as we expect the new backend to work now.
+     */
+    void cancelDebt(final long now) {
+        tracker.cancelDebt(now);
+    }
+
+    /**
+     * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
+     * to be added to it during replay. When we set the successor all entries enqueued between when this methods
+     * returns and the successor is set will be replayed to the successor.
+     *
+     * @return Collection of entries present in the queue.
+     */
+    final Collection<ConnectionEntry> drain() {
+        final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
+        ret.addAll(inflight);
+        ret.addAll(pending);
+        inflight.clear();
+        pending.clear();
+        return ret;
     }
 
     final long ticksStalling(final long now) {
@@ -135,12 +178,16 @@ abstract class TransmitQueue {
         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
 
         // We have freed up a slot, try to transmit something
+        tryTransmit(now);
+
+        return Optional.of(entry);
+    }
+
+    final void tryTransmit(final long now) {
         final int toSend = canTransmitCount(inflight.size());
         if (toSend > 0 && !pending.isEmpty()) {
             transmitEntries(toSend, now);
         }
-
-        return Optional.of(entry);
     }
 
     private void transmitEntries(final int maxTransmit, final long now) {
@@ -158,7 +205,7 @@ abstract class TransmitQueue {
     }
 
     private void transmitEntry(final ConnectionEntry entry, final long now) {
-        LOG.debug("Queue {} transmitting entry {}", entry);
+        LOG.debug("Queue {} transmitting entry {}", this, entry);
         // We are not thread-safe and are supposed to be externally-guarded,
         // hence send-before-record should be fine.
         // This needs to be revisited if the external guards are lowered.
@@ -172,6 +219,7 @@ abstract class TransmitQueue {
      */
     final long enqueue(final ConnectionEntry entry, final long now) {
         if (successor != null) {
+            // This call will pay the enqueuing price, hence the caller does not have to
             successor.forwardEntry(entry, now);
             return 0;
         }
@@ -229,20 +277,40 @@ abstract class TransmitQueue {
     }
 
     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
-        Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
+        Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this);
         successor = Preconditions.checkNotNull(forwarder);
         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
 
+        /*
+         * We need to account for entries which have been added between the time drain() was called and this method
+         * is invoked. Since the old connection is visible during replay and some entries may have completed on the
+         * replay thread, there was an avenue for this to happen.
+         */
+        int count = 0;
         ConnectionEntry entry = inflight.poll();
         while (entry != null) {
-            successor.forwardEntry(entry, now);
+            successor.replayEntry(entry, now);
             entry = inflight.poll();
+            count++;
         }
 
         entry = pending.poll();
         while (entry != null) {
-            successor.forwardEntry(entry, now);
+            successor.replayEntry(entry, now);
             entry = pending.poll();
+            count++;
+        }
+
+        LOG.debug("Connection {} queue spliced {} messages", this, count);
+    }
+
+    final void remove(final long now) {
+        final TransmittedConnectionEntry txe = inflight.poll();
+        if (txe == null) {
+            final ConnectionEntry entry = pending.pop();
+            tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
+        } else {
+            tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
         }
     }
 
@@ -318,5 +386,4 @@ abstract class TransmitQueue {
         }
         queue.clear();
     }
-
 }