Bug 8619: Introduce inheritance of progress trackers 38/60538/1
authorVratko Polak <vrpolak@cisco.com>
Fri, 14 Jul 2017 15:50:32 +0000 (17:50 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 18 Jul 2017 14:35:06 +0000 (16:35 +0200)
+ Introduce cancelDebt method.
+ Use the newly introduced functionality in client code.
+ Delete unused copy constructors (including unit test).

Change-Id: Ib976343ed5f50c649ea08206c897cb70dead8b86
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 12b4928ef66a82f4a128a11701663ac23143c1d7)

14 files changed:
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTrackerTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java

index 5f23557..1fa6763 100644 (file)
@@ -89,21 +89,34 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     private volatile RequestException poisoned;
 
+    // Private constructor to avoid code duplication.
+    private AbstractClientConnection(final AbstractClientConnection<T> oldConn, final TransmitQueue newQueue) {
+        this.context = Preconditions.checkNotNull(oldConn.context);
+        this.cookie = Preconditions.checkNotNull(oldConn.cookie);
+        this.queue = Preconditions.checkNotNull(newQueue);
+        // Will be updated in finishReplay if needed.
+        this.lastReceivedTicks = oldConn.lastReceivedTicks;
+    }
+
+    // This constructor is only to be called by ConnectingClientConnection constructor.
     // Do not allow subclassing outside of this package
-    AbstractClientConnection(final ClientActorContext context, final Long cookie,
-            final TransmitQueue queue) {
+    AbstractClientConnection(final ClientActorContext context, final Long cookie, final int queueDepth) {
         this.context = Preconditions.checkNotNull(context);
         this.cookie = Preconditions.checkNotNull(cookie);
-        this.queue = Preconditions.checkNotNull(queue);
+        this.queue = new TransmitQueue.Halted(queueDepth);
         this.lastReceivedTicks = currentTime();
     }
 
+    // This constructor is only to be called (indirectly) by ReconnectingClientConnection constructor.
     // Do not allow subclassing outside of this package
-    AbstractClientConnection(final AbstractClientConnection<T> oldConnection, final int targetQueueSize) {
-        this.context = oldConnection.context;
-        this.cookie = oldConnection.cookie;
-        this.queue = new TransmitQueue.Halted(targetQueueSize);
-        this.lastReceivedTicks = oldConnection.lastReceivedTicks;
+    AbstractClientConnection(final AbstractClientConnection<T> oldConn) {
+        this(oldConn, new TransmitQueue.Halted(oldConn.queue, oldConn.currentTime()));
+    }
+
+    // This constructor is only to be called (indirectly) by ConnectedClientConnection constructor.
+    // Do not allow subclassing outside of this package
+    AbstractClientConnection(final AbstractClientConnection<T> oldConn, final T newBackend, final int queueDepth) {
+        this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime()));
     }
 
     public final ClientActorContext context() {
@@ -172,6 +185,11 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
     }
 
+    // To be called from ClientActorBehavior on ConnectedClientConnection after entries are replayed.
+    final void cancelDebt() {
+        queue.cancelDebt(currentTime());
+    }
+
     public abstract Optional<T> getBackendInfo();
 
     final Collection<ConnectionEntry> startReplay() {
index c87556c..b369aea 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.base.Preconditions;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.slf4j.Logger;
@@ -40,13 +39,15 @@ abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends
 
     private final T backend;
 
-    AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
-        super(context, cookie, new TransmitQueue.Transmitting(targetQueueSize(backend), backend));
-        this.backend = Preconditions.checkNotNull(backend);
+    // To be called by ConnectedClientConnection only.
+    AbstractReceivingClientConnection(final AbstractClientConnection<T> oldConnection, final T newBackend) {
+        super(oldConnection, newBackend, targetQueueSize(newBackend));
+        this.backend = newBackend;
     }
 
+    // To be called by ReconnectingClientConnection only.
     AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
-        super(oldConnection, targetQueueSize(oldConnection.backend));
+        super(oldConnection);
         this.backend = oldConnection.backend;
     }
 
index fe7b91f..919aaf8 100644 (file)
@@ -46,7 +46,7 @@ final class AveragingProgressTracker extends ProgressTracker {
      * @param limit of open tasks to avoid exceeding
      * @param ticksPerTask value to use as default
      */
-    private AveragingProgressTracker(final int limit, final long ticksPerTask) {
+    private AveragingProgressTracker(final long limit, final long ticksPerTask) {
         super(ticksPerTask);
         tasksOpenLimit = limit;
         noDelayThreshold = limit / 2;
@@ -57,22 +57,38 @@ final class AveragingProgressTracker extends ProgressTracker {
      *
      * @param limit of open tasks to avoid exceeding
      */
-    AveragingProgressTracker(final int limit) {
+    AveragingProgressTracker(final long limit) {
         this(limit, DEFAULT_TICKS_PER_TASK);
     }
 
     /**
-     * Create a copy of an existing tracker, all future tracking is fully independent.
+     * Construct a new tracker suitable for a new task queue related to a "reconnect".
      *
-     * @param tracker the instance to copy state from
+     * <p>The limit is set independently of the old tracker.
+     *
+     * @param oldTracker the tracker used for the previously used backend
+     * @param limit of open tasks to avoid exceeding
+     * @param now tick number corresponding to caller's present
+     */
+    AveragingProgressTracker(final ProgressTracker oldTracker, final long limit, final long now) {
+        super(oldTracker, now);
+        tasksOpenLimit = limit;
+        noDelayThreshold = limit / 2;
+    }
+
+    /**
+     * Construct a new tracker suitable for a new task queue related to a "reconnect".
+     *
+     * <p>The limit is copied from the old tracker.
+     *
+     * @param oldTracker the tracker used for the previously used backend
+     * @param now tick number corresponding to caller's present
      */
-    AveragingProgressTracker(final AveragingProgressTracker tracker) {
-        super(tracker);
-        this.tasksOpenLimit = tracker.tasksOpenLimit;
-        this.noDelayThreshold = tracker.noDelayThreshold;
+    AveragingProgressTracker(final AveragingProgressTracker oldTracker, final long now) {
+        this(oldTracker, oldTracker.tasksOpenLimit, now);
     }
 
-    // Public shared access (read-only) accessor-like methods
+    // Protected read-only methods
 
     /**
      * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
@@ -89,7 +105,7 @@ final class AveragingProgressTracker extends ProgressTracker {
      * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
      */
     @Override
-    public long estimateIsolatedDelay(final long now) {
+    protected long estimateIsolatedDelay(final long now) {
         final long open = tasksOpen();
         if (open <= noDelayThreshold) {
             return 0L;
@@ -102,7 +118,7 @@ final class AveragingProgressTracker extends ProgressTracker {
          * Calculate the task capacity relative to the limit on open tasks. In real terms this value can be
          * in the open interval (0.0, 0.5).
          */
-        final double relativeRemainingCapacity = 1.0 - (((double) open) / tasksOpenLimit);
+        final double relativeRemainingCapacity = 1.0 - (double) open / tasksOpenLimit;
 
         /*
          * Calculate delay coefficient. It increases in inverse proportion to relative remaining capacity, approaching
index 0d55835..79d7ece 100644 (file)
@@ -284,22 +284,22 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
     @GuardedBy("connectionsLock")
     @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection<T> newConn);
 
-    private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> conn,
+    private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> oldConn,
             final T backend, final Throwable failure) {
         if (failure != null) {
             if (failure instanceof TimeoutException) {
-                if (!conn.equals(connections.get(shard))) {
+                if (!oldConn.equals(connections.get(shard))) {
                     // AbstractClientConnection will remove itself when it decides there is no point in continuing,
                     // at which point we want to stop retrying
-                    LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn,
-                        failure);
+                    LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard,
+                        oldConn, failure);
                     return;
                 }
 
                 LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard,
                     RESOLVE_RETRY_DURATION, failure);
                 context().executeInActor(b -> {
-                    resolveConnection(shard, conn);
+                    resolveConnection(shard, oldConn);
                     return b;
                 }, RESOLVE_RETRY_DURATION);
                 return;
@@ -313,7 +313,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
                 cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure);
             }
 
-            conn.poison(cause);
+            oldConn.poison(cause);
             return;
         }
 
@@ -323,29 +323,31 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             final Stopwatch sw = Stopwatch.createStarted();
 
             // Create a new connected connection
-            final ConnectedClientConnection<T> newConn = new ConnectedClientConnection<>(conn.context(),
-                    conn.cookie(), backend);
-            LOG.info("{}: resolving connection {} to {}", persistenceId(), conn, newConn);
+            final ConnectedClientConnection<T> newConn = new ConnectedClientConnection<>(oldConn, backend);
+            LOG.info("{}: resolving connection {} to {}", persistenceId(), oldConn, newConn);
 
             // Start reconnecting without the old connection lock held
             final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn));
 
             // Lock the old connection and get a reference to its entries
-            final Collection<ConnectionEntry> replayIterable = conn.startReplay();
+            final Collection<ConnectionEntry> replayIterable = oldConn.startReplay();
 
             // Finish the connection attempt
             final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable));
 
+            // Cancel sleep debt after entries were replayed, before new connection starts receiving.
+            newConn.cancelDebt();
+
             // Install the forwarder, unlocking the old connection
-            conn.finishReplay(forwarder);
+            oldConn.finishReplay(forwarder);
 
             // Make sure new lookups pick up the new connection
-            if (!connections.replace(shard, conn, newConn)) {
-                final AbstractClientConnection<T> existing = connections.get(conn.cookie());
+            if (!connections.replace(shard, oldConn, newConn)) {
+                final AbstractClientConnection<T> existing = connections.get(oldConn.cookie());
                 LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo",
-                    persistenceId(), conn, existing, newConn);
+                    persistenceId(), oldConn, existing, newConn);
             } else {
-                LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), conn, newConn, sw);
+                LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), oldConn, newConn, sw);
             }
         } finally {
             connectionsLock.unlockWrite(stamp);
index 0afc7ac..c540142 100644 (file)
@@ -14,8 +14,9 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 @Beta
 @NotThreadSafe
 public final class ConnectedClientConnection<T extends BackendInfo> extends AbstractReceivingClientConnection<T> {
-    ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
-        super(context, cookie, backend);
+
+    ConnectedClientConnection(final AbstractClientConnection<T> oldConnection, final T newBackend) {
+        super(oldConnection, newBackend);
     }
 
     @Override
index 07ef769..cae6981 100644 (file)
@@ -22,7 +22,7 @@ public final class ConnectingClientConnection<T extends BackendInfo> extends Abs
 
     // Initial state, never instantiated externally
     ConnectingClientConnection(final ClientActorContext context, final Long cookie) {
-        super(context, cookie, new TransmitQueue.Halted(TARGET_QUEUE_DEPTH));
+        super(context, cookie, TARGET_QUEUE_DEPTH);
     }
 
     @Override
index 027b35d..15bfa56 100644 (file)
@@ -99,37 +99,43 @@ abstract class ProgressTracker {
     }
 
     /**
-     * Construct a copy of an existing tracker, all future tracking is fully independent.
+     * Construct a new tracker suitable for a new task queue related to a "reconnect".
      *
-     * @param tracker the instance to copy state from
-     */
-    ProgressTracker(final ProgressTracker tracker) {
-        this.defaultTicksPerTask = tracker.defaultTicksPerTask;
-        this.tasksClosed = tracker.tasksClosed;
-        this.tasksEncountered = tracker.tasksEncountered;
-        this.lastClosed = tracker.lastClosed;
-        this.lastIdle = tracker.lastIdle;
-        this.nearestAllowed = tracker.nearestAllowed;
-        this.elapsedBeforeIdle = tracker.elapsedBeforeIdle;
-    }
-
-    // Public shared access (read-only) accessor-like methods
-
-    /**
-     * Get the value of default ticks per task this instance was created to use.
+     * <p>When reconnecting to a new backend, tasks may need to be re-processed by the frontend,
+     * possibly resulting in a different number of tasks.
+     * Also, performance of the new backend can be different, but the perforance of the previous backend
+     * is generally still better estimate than defaults of a brand new tracker.
      *
-     * @return default ticks per task value
+     * <p>This "inheritance constructor" creates a new tracker with no open tasks (thus initially idle),
+     * but other internal values should lead to a balanced performance
+     * after tasks opened in the source tracker are "replayed" into the new tracker.
+     *
+     * <p>In particular, this impementation keeps the number of closed tasks the same,
+     * and makes it so ticksWorkedPerClosedTask is initially the same as in the old tracker.
+     *
+     * @param oldTracker the tracker used for the previously used backend
+     * @param now tick number corresponding to caller's present
      */
-    public final long defaultTicksPerTask() {
-        return defaultTicksPerTask;
+    ProgressTracker(final ProgressTracker oldTracker, final long now) {
+        this.defaultTicksPerTask = oldTracker.defaultTicksPerTask;
+        this.tasksEncountered = this.tasksClosed = oldTracker.tasksClosed;
+        this.lastClosed = oldTracker.lastClosed;
+        this.nearestAllowed = oldTracker.nearestAllowed;  // Call cancelDebt explicitly if needed.
+        this.lastIdle = oldTracker.lastIdle;
+        this.elapsedBeforeIdle = oldTracker.elapsedBeforeIdle;
+        if (!oldTracker.isIdle()) {
+            transitToIdle(now);
+        }
     }
 
+    // "Public" shared access (read-only) accessor-like methods
+
     /**
      * Get number of tasks closed so far.
      *
      * @return number of tasks known to be finished already; the value never decreases
      */
-    public final long tasksClosed() {
+    final long tasksClosed() {
         return tasksClosed;
     }
 
@@ -138,7 +144,7 @@ abstract class ProgressTracker {
      *
      * @return number of tasks encountered so far, open or finished; the value never decreases
      */
-    public final long tasksEncountered() {
+    final long tasksEncountered() {
         return tasksEncountered;
     }
 
@@ -147,7 +153,7 @@ abstract class ProgressTracker {
      *
      * @return number of tasks started but not finished yet
      */
-    public final long tasksOpen() {
+    final long tasksOpen() {  // TODO: Should we return int?
         // TODO: Should we check the return value is non-negative?
         return tasksEncountered - tasksClosed;
     }
@@ -157,7 +163,7 @@ abstract class ProgressTracker {
      *
      * @return {@code true} if every encountered task is already closed, {@code false} otherwise
      */
-    public boolean isIdle() {
+    final boolean isIdle() {
         return tasksClosed >= tasksEncountered;
     }
 
@@ -167,18 +173,19 @@ abstract class ProgressTracker {
      * @param now tick number corresponding to caller's present
      * @return number of ticks backend is neither idle nor responding
      */
-    public long ticksStalling(final long now) {
+    final long ticksStalling(final long now) {
         return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed;
     }
 
+    // Read only protected methods.
+
     /**
-     * Number of ticks elapsed (before now) while there was at least one open task.
+     * Get the value of default ticks per task this instance was created to use.
      *
-     * @param now tick number corresponding to caller's present
-     * @return number of ticks there was at least one task open
+     * @return default ticks per task value
      */
-    public long ticksWorked(final long now) {
-        return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
+    protected final long defaultTicksPerTask() {
+        return defaultTicksPerTask;
     }
 
     /**
@@ -187,25 +194,23 @@ abstract class ProgressTracker {
      * @param now tick number corresponding to caller's present
      * @return total ticks worked divided by closed tasks, or the default value if no closed tasks
      */
-    public double ticksWorkedPerClosedTask(final long now) {
+    protected final double ticksWorkedPerClosedTask(final long now) {
         if (tasksClosed < 1) {
             return defaultTicksPerTask;
         }
         return (double) ticksWorked(now) / tasksClosed;
     }
 
+    // Read only private methods.
+
     /**
-     * Give an estimate of openTask() return value.
-     *
-     * <p>When the returned delay is positive, the caller thread should wait that time before opening additional task.
-     *
-     * <p>This method in general takes into account previously assigned delays to avoid overlaps.
+     * Number of ticks elapsed (before now) while there was at least one open task.
      *
      * @param now tick number corresponding to caller's present
-     * @return delay (in ticks) after which another openTask() is fair to be called by the same thread again
+     * @return number of ticks there was at least one task open
      */
-    public long estimateDelay(final long now) {
-        return estimateAllowed(now) - now;
+    private long ticksWorked(final long now) {
+        return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
     }
 
     /**
@@ -218,11 +223,11 @@ abstract class ProgressTracker {
      * @param now tick number corresponding to caller's present
      * @return estimated tick number when all threads with opened tasks are done waiting
      */
-    public long estimateAllowed(final long now) {
+    private long estimateAllowed(final long now) {
         return Math.max(now, nearestAllowed + estimateIsolatedDelay(now));
     }
 
-    // State-altering public methods.
+    // State-altering "public" methods.
 
     /**
      * Track a task is being closed.
@@ -232,11 +237,11 @@ abstract class ProgressTracker {
      * @param transmitTicks see TransitQueue#recordCompletion
      * @param execNanos see TransitQueue#recordCompletion
      */
-    public void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
+    final void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
         if (isIdle()) {
             LOG.info("Attempted to close a task while no tasks are open");
         } else {
-            protectedCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
+            unsafeCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
         }
     }
 
@@ -246,13 +251,27 @@ abstract class ProgressTracker {
      * @param now tick number corresponding to caller's present
      * @return number of ticks (nanos) the caller thread should wait before opening another task
      */
-    public long openTask(final long now) {
-        protectedOpenTask(now);
+    final long openTask(final long now) {
+        openTaskWithoutThrottle(now);
         return reserveDelay(now);
     }
 
-    // Internal state-altering methods. Protected instead of private,
-    // allowing subclasses to weaken ad-hoc invariants of current implementation.
+    /**
+     * Set nearestAllowed value to now.
+     *
+     * <p>This is useful when new a backend has just connected,
+     * after a period of no working backend present.
+     * The accumulated delays should not limit future tasks.
+     * The queue fullness and the averaged backend performance are kept,
+     * as they should result in good enough estimations for new tasks.
+     *
+     * @param now tick number corresponding to caller's present
+     */
+    final void cancelDebt(final long now) {
+        nearestAllowed = now;
+    }
+
+    // Private state-altering methods.
 
     /**
      * Compute the next delay and update nearestAllowed value accordingly.
@@ -260,7 +279,7 @@ abstract class ProgressTracker {
      * @param now tick number corresponding to caller's present
      * @return number of ticks (nanos) the caller thread should wait before opening another task
      */
-    protected long reserveDelay(final long now) {
+    private long reserveDelay(final long now) {
         nearestAllowed = estimateAllowed(now);
         return nearestAllowed - now;
     }
@@ -276,12 +295,12 @@ abstract class ProgressTracker {
      * @param transmitTicks see TransmitQueue#recordCompletion
      * @param execNanos see TransmitQueue#recordCompletion
      */
-    protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
+    private void unsafeCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
                 final long execNanos) {
         tasksClosed++;
         lastClosed = now;
         if (isIdle()) {
-            elapsedBeforeIdle += now - lastIdle;
+            transitToIdle(now);
         }
     }
 
@@ -293,18 +312,34 @@ abstract class ProgressTracker {
      *
      * @param now tick number corresponding to caller's present
      */
-    protected void protectedOpenTask(final long now) {
+    private void openTaskWithoutThrottle(final long now) {
         if (isIdle()) {
-            lastIdle = Math.max(now, lastIdle);
+            transitFromIdle(now);
         }
         tasksEncountered++;
     }
 
+    /**
+     * Update lastIdle as a new "last" just hapened.
+     */
+    private void transitFromIdle(final long now) {
+        lastIdle = Math.max(now, lastIdle);
+    }
+
+    /**
+     * Update elapsedBeforeIdle as the "before" has jast moved.
+     */
+    private void transitToIdle(final long now) {
+        elapsedBeforeIdle += Math.max(0, now - lastIdle);
+    }
+
+    // Protected abstract read-only methods.
+
     /**
      * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
      *
      * @param now tick number corresponding to caller's present
      * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
      */
-    abstract long estimateIsolatedDelay(long now);
+    protected abstract long estimateIsolatedDelay(long now);
 }
index 6f283f5..24264ee 100644 (file)
@@ -54,10 +54,16 @@ import org.slf4j.LoggerFactory;
 @NotThreadSafe
 abstract class TransmitQueue {
     static final class Halted extends TransmitQueue {
+        // For ConnectingClientConnection.
         Halted(final int targetDepth) {
             super(targetDepth);
         }
 
+        // For ReconnectingClientConnection.
+        Halted(final TransmitQueue oldQueue, final long now) {
+            super(oldQueue, now);
+        }
+
         @Override
         int canTransmitCount(final int inflightSize) {
             return 0;
@@ -73,8 +79,9 @@ abstract class TransmitQueue {
         private final BackendInfo backend;
         private long nextTxSequence;
 
-        Transmitting(final int targetDepth, final BackendInfo backend) {
-            super(targetDepth);
+        // For ConnectedClientConnection.
+        Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) {
+            super(oldQueue, targetDepth, now);
             this.backend = Preconditions.checkNotNull(backend);
         }
 
@@ -99,13 +106,37 @@ abstract class TransmitQueue {
 
     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
-    private final ProgressTracker tracker;
+    private final AveragingProgressTracker tracker;  // Cannot be just ProgressTracker as we are inheriting limits.
     private ReconnectForwarder successor;
 
+    /**
+     * Construct initial transmitting queue.
+     */
     TransmitQueue(final int targetDepth) {
         tracker = new AveragingProgressTracker(targetDepth);
     }
 
+    /**
+     * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
+     */
+    TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
+        tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
+    }
+
+    /**
+     * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
+     */
+    TransmitQueue(final TransmitQueue oldQueue, final long now) {
+        tracker = new AveragingProgressTracker(oldQueue.tracker, now);
+    }
+
+    /**
+     * Cancel the accumulated sum of delays as we expect the new backend to work now.
+     */
+    void cancelDebt(final long now) {
+        tracker.cancelDebt(now);
+    }
+
     /**
      * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
      * to be added to it during replay. When we set the successor all entries enqueued between when this methods
index 77d5938..e31fc55 100644 (file)
@@ -44,7 +44,7 @@ public class AccessClientUtil {
 
     public static <T extends BackendInfo> ConnectedClientConnection<T> createConnectedConnection(
             final ClientActorContext context, final Long cookie, final T backend) {
-        return new ConnectedClientConnection<>(context, cookie, backend);
+        return new ConnectedClientConnection<>(new ConnectingClientConnection<>(context, cookie), backend);
     }
 
     public static void completeRequest(final AbstractClientConnection<? extends BackendInfo> connection,
index 9ed77fe..0e00899 100644 (file)
@@ -8,15 +8,12 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.testing.FakeTicker;
 import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.internal.matchers.apachecommons.ReflectionEquals;
 
 public class AveragingProgressTrackerTest {
     private static final long CHECKER = TimeUnit.MILLISECONDS.toNanos(500);
@@ -66,24 +63,4 @@ public class AveragingProgressTrackerTest {
         assertEquals(0, averagingProgressTracker.estimateIsolatedDelay(ticker.read()));
     }
 
-    @Test
-    public void copyObjectTest() {
-        final AveragingProgressTracker copyAverageProgressTracker = new AveragingProgressTracker(
-                averagingProgressTracker);
-
-        // copied object is the same as original
-        assertTrue(new ReflectionEquals(averagingProgressTracker).matches(copyAverageProgressTracker));
-
-        // afterwards work of copied tracker is independent
-        averagingProgressTracker.openTask(ticker.read());
-
-        final long time = ticker.read();
-        assertNotEquals("Trackers are expected to return different results for tracking",
-                averagingProgressTracker.openTask(time), copyAverageProgressTracker.openTask(time));
-        assertNotEquals("Trackers are expected to encounter different amount of tasks",
-                averagingProgressTracker.tasksEncountered(), copyAverageProgressTracker.tasksEncountered());
-
-        // and copied object is then no more the same as original
-        assertFalse(new ReflectionEquals(averagingProgressTracker).matches(copyAverageProgressTracker));
-    }
 }
\ No newline at end of file
index 0f30806..e6a09c1 100644 (file)
@@ -35,7 +35,8 @@ public class ConnectedClientConnectionTest
     @Override
     protected ConnectedClientConnection<BackendInfo> createConnection() {
         final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10);
-        return new ConnectedClientConnection<>(context, 0L, backend);
+        final ConnectingClientConnection<BackendInfo> connectingConn = new ConnectingClientConnection<>(context, 0L);
+        return  new ConnectedClientConnection<>(connectingConn, backend);
     }
 
     @Override
index 7dd609e..b83c810 100644 (file)
@@ -356,10 +356,12 @@ public class ConnectingClientConnectionTest {
     }
 
     private void setupBackend() {
-        final ConnectedClientConnection<?> newConn = new ConnectedClientConnection<>(mockContext, mockCookie,
-                mockBackendInfo);
-        queue.setForwarder(new SimpleReconnectForwarder(newConn));
-        queue = newConn;
+        final ConnectingClientConnection<BackendInfo> connectingConn =
+                new ConnectingClientConnection<>(mockContext, mockCookie);
+        final ConnectedClientConnection<BackendInfo> connectedConn =
+                new ConnectedClientConnection<>(connectingConn, mockBackendInfo);
+        queue.setForwarder(new SimpleReconnectForwarder(connectedConn));
+        queue = connectedConn;
     }
 
     private void assertTransmit(final Request<?, ?> expected, final long sequence) {
index 0805f56..679430c 100644 (file)
@@ -43,10 +43,10 @@ public class ReconnectingClientConnectionTest
     @Override
     protected ReconnectingClientConnection<BackendInfo> createConnection() {
         final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10);
-
-        final ConnectedClientConnection<BackendInfo> oldConnection =
-                new ConnectedClientConnection<>(context, 0L, backend);
-        return new ReconnectingClientConnection<>(oldConnection, mock(RequestException.class));
+        final ConnectingClientConnection<BackendInfo> connectingConn = new ConnectingClientConnection<>(context, 0L);
+        final ConnectedClientConnection<BackendInfo> connectedConn =
+                new ConnectedClientConnection<>(connectingConn, backend);
+        return new ReconnectingClientConnection<>(connectedConn, mock(RequestException.class));
     }
 
     @Override
index 6dd38ce..11fc421 100644 (file)
@@ -43,6 +43,10 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
 
     private BackendInfo backendInfo;
 
+    private static long now() {
+        return Ticker.systemTicker().read();
+    }
+
     @Override
     protected int getMaxInFlightMessages() {
         return backendInfo.getMaxMessages();
@@ -51,7 +55,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
     @Override
     protected TransmitQueue.Transmitting createQueue() {
         backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
-        return new TransmitQueue.Transmitting(0, backendInfo);
+        return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now());
     }
 
     @Test
@@ -63,8 +67,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
         final Consumer<Response<?, ?>> callback1 = createConsumerMock();
         final Consumer<Response<?, ?>> callback2 = createConsumerMock();
-        final long now1 = Ticker.systemTicker().read();
-        final long now2 = Ticker.systemTicker().read();
+        final long now1 = now();
+        final long now2 = now();
         //enqueue 2 entries
         queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
         queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
@@ -91,7 +95,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
     public void testEnqueueCanTransmit() throws Exception {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
-        final long now = Ticker.systemTicker().read();
+        final long now = now();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
         assertEquals(request, requestEnvelope.getMessage());
@@ -101,7 +105,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
     public void testEnqueueBackendFull() throws Exception {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
-        final long now = Ticker.systemTicker().read();
+        final long now = now();
         final int sentMessages = getMaxInFlightMessages() + 1;
         for (int i = 0; i < sentMessages; i++) {
             queue.enqueue(new ConnectionEntry(request, callback, now), now);
@@ -127,7 +131,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
     public void testTransmit() throws Exception {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
-        final long now = Ticker.systemTicker().read();
+        final long now = now();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
         queue.transmit(entry, now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.