BUG-8619: do not touch forward path during purge enqueue
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
index 6f283f549cbdbdf50c75d99397fd650a2b709897..705bb0d9cd6e7a32da5f45884bc3d07b6f2592ed 100644 (file)
@@ -54,10 +54,16 @@ import org.slf4j.LoggerFactory;
 @NotThreadSafe
 abstract class TransmitQueue {
     static final class Halted extends TransmitQueue {
+        // For ConnectingClientConnection.
         Halted(final int targetDepth) {
             super(targetDepth);
         }
 
+        // For ReconnectingClientConnection.
+        Halted(final TransmitQueue oldQueue, final long now) {
+            super(oldQueue, now);
+        }
+
         @Override
         int canTransmitCount(final int inflightSize) {
             return 0;
@@ -73,8 +79,9 @@ abstract class TransmitQueue {
         private final BackendInfo backend;
         private long nextTxSequence;
 
-        Transmitting(final int targetDepth, final BackendInfo backend) {
-            super(targetDepth);
+        // For ConnectedClientConnection.
+        Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) {
+            super(oldQueue, targetDepth, now);
             this.backend = Preconditions.checkNotNull(backend);
         }
 
@@ -99,13 +106,37 @@ abstract class TransmitQueue {
 
     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
-    private final ProgressTracker tracker;
+    private final AveragingProgressTracker tracker;  // Cannot be just ProgressTracker as we are inheriting limits.
     private ReconnectForwarder successor;
 
+    /**
+     * Construct initial transmitting queue.
+     */
     TransmitQueue(final int targetDepth) {
         tracker = new AveragingProgressTracker(targetDepth);
     }
 
+    /**
+     * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
+     */
+    TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
+        tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
+    }
+
+    /**
+     * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
+     */
+    TransmitQueue(final TransmitQueue oldQueue, final long now) {
+        tracker = new AveragingProgressTracker(oldQueue.tracker, now);
+    }
+
+    /**
+     * Cancel the accumulated sum of delays as we expect the new backend to work now.
+     */
+    void cancelDebt(final long now) {
+        tracker.cancelDebt(now);
+    }
+
     /**
      * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
      * to be added to it during replay. When we set the successor all entries enqueued between when this methods
@@ -181,18 +212,31 @@ abstract class TransmitQueue {
         inflight.addLast(transmit(entry, now));
     }
 
-    /**
-     * Enqueue an entry, possibly also transmitting it.
-     *
-     * @return Delay to be forced on the calling thread, in nanoseconds.
-     */
-    final long enqueue(final ConnectionEntry entry, final long now) {
+    final long enqueueOrForward(final ConnectionEntry entry, final long now) {
         if (successor != null) {
             // This call will pay the enqueuing price, hence the caller does not have to
             successor.forwardEntry(entry, now);
             return 0;
         }
 
+        return enqueue(entry, now);
+    }
+
+    final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
+        if (successor != null) {
+            successor.replayEntry(entry, now);
+        } else {
+            enqueue(entry, now);
+        }
+    }
+
+    /**
+     * Enqueue an entry, possibly also transmitting it.
+     *
+     * @return Delay to be forced on the calling thread, in nanoseconds.
+     */
+    private long enqueue(final ConnectionEntry entry, final long now) {
+
         // XXX: we should place a guard against incorrect entry sequences:
         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues