From dafc95d149bc62f101de37e94b9b5e3526d4e87b Mon Sep 17 00:00:00 2001 From: Vratko Polak Date: Fri, 14 Jul 2017 17:50:32 +0200 Subject: [PATCH] Bug 8619: Introduce inheritance of progress trackers + Introduce cancelDebt method. + Use the newly introduced functionality in client code. + Delete unused copy constructors (including unit test). Change-Id: Ib976343ed5f50c649ea08206c897cb70dead8b86 Signed-off-by: Vratko Polak Signed-off-by: Robert Varga (cherry picked from commit 12b4928ef66a82f4a128a11701663ac23143c1d7) --- .../client/AbstractClientConnection.java | 34 ++++- .../AbstractReceivingClientConnection.java | 11 +- .../client/AveragingProgressTracker.java | 38 +++-- .../access/client/ClientActorBehavior.java | 32 ++-- .../client/ConnectedClientConnection.java | 5 +- .../client/ConnectingClientConnection.java | 2 +- .../access/client/ProgressTracker.java | 141 +++++++++++------- .../cluster/access/client/TransmitQueue.java | 37 ++++- .../access/client/AccessClientUtil.java | 2 +- .../client/AveragingProgressTrackerTest.java | 23 --- .../client/ConnectedClientConnectionTest.java | 3 +- .../ConnectingClientConnectionTest.java | 10 +- .../ReconnectingClientConnectionTest.java | 8 +- .../client/TransmittingTransmitQueueTest.java | 16 +- 14 files changed, 225 insertions(+), 137 deletions(-) diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 5f23557dc3..1fa67632ea 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -89,21 +89,34 @@ public abstract class AbstractClientConnection { private volatile RequestException poisoned; + // Private constructor to avoid code duplication. + private AbstractClientConnection(final AbstractClientConnection oldConn, final TransmitQueue newQueue) { + this.context = Preconditions.checkNotNull(oldConn.context); + this.cookie = Preconditions.checkNotNull(oldConn.cookie); + this.queue = Preconditions.checkNotNull(newQueue); + // Will be updated in finishReplay if needed. + this.lastReceivedTicks = oldConn.lastReceivedTicks; + } + + // This constructor is only to be called by ConnectingClientConnection constructor. // Do not allow subclassing outside of this package - AbstractClientConnection(final ClientActorContext context, final Long cookie, - final TransmitQueue queue) { + AbstractClientConnection(final ClientActorContext context, final Long cookie, final int queueDepth) { this.context = Preconditions.checkNotNull(context); this.cookie = Preconditions.checkNotNull(cookie); - this.queue = Preconditions.checkNotNull(queue); + this.queue = new TransmitQueue.Halted(queueDepth); this.lastReceivedTicks = currentTime(); } + // This constructor is only to be called (indirectly) by ReconnectingClientConnection constructor. // Do not allow subclassing outside of this package - AbstractClientConnection(final AbstractClientConnection oldConnection, final int targetQueueSize) { - this.context = oldConnection.context; - this.cookie = oldConnection.cookie; - this.queue = new TransmitQueue.Halted(targetQueueSize); - this.lastReceivedTicks = oldConnection.lastReceivedTicks; + AbstractClientConnection(final AbstractClientConnection oldConn) { + this(oldConn, new TransmitQueue.Halted(oldConn.queue, oldConn.currentTime())); + } + + // This constructor is only to be called (indirectly) by ConnectedClientConnection constructor. + // Do not allow subclassing outside of this package + AbstractClientConnection(final AbstractClientConnection oldConn, final T newBackend, final int queueDepth) { + this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime())); } public final ClientActorContext context() { @@ -172,6 +185,11 @@ public abstract class AbstractClientConnection { } } + // To be called from ClientActorBehavior on ConnectedClientConnection after entries are replayed. + final void cancelDebt() { + queue.cancelDebt(currentTime()); + } + public abstract Optional getBackendInfo(); final Collection startReplay() { diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java index c87556ce98..b369aea9dd 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.base.MoreObjects.ToStringHelper; -import com.google.common.base.Preconditions; import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.slf4j.Logger; @@ -40,13 +39,15 @@ abstract class AbstractReceivingClientConnection extends private final T backend; - AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) { - super(context, cookie, new TransmitQueue.Transmitting(targetQueueSize(backend), backend)); - this.backend = Preconditions.checkNotNull(backend); + // To be called by ConnectedClientConnection only. + AbstractReceivingClientConnection(final AbstractClientConnection oldConnection, final T newBackend) { + super(oldConnection, newBackend, targetQueueSize(newBackend)); + this.backend = newBackend; } + // To be called by ReconnectingClientConnection only. AbstractReceivingClientConnection(final AbstractReceivingClientConnection oldConnection) { - super(oldConnection, targetQueueSize(oldConnection.backend)); + super(oldConnection); this.backend = oldConnection.backend; } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java index fe7b91f9dd..919aaf8bf8 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java @@ -46,7 +46,7 @@ final class AveragingProgressTracker extends ProgressTracker { * @param limit of open tasks to avoid exceeding * @param ticksPerTask value to use as default */ - private AveragingProgressTracker(final int limit, final long ticksPerTask) { + private AveragingProgressTracker(final long limit, final long ticksPerTask) { super(ticksPerTask); tasksOpenLimit = limit; noDelayThreshold = limit / 2; @@ -57,22 +57,38 @@ final class AveragingProgressTracker extends ProgressTracker { * * @param limit of open tasks to avoid exceeding */ - AveragingProgressTracker(final int limit) { + AveragingProgressTracker(final long limit) { this(limit, DEFAULT_TICKS_PER_TASK); } /** - * Create a copy of an existing tracker, all future tracking is fully independent. + * Construct a new tracker suitable for a new task queue related to a "reconnect". * - * @param tracker the instance to copy state from + *

The limit is set independently of the old tracker. + * + * @param oldTracker the tracker used for the previously used backend + * @param limit of open tasks to avoid exceeding + * @param now tick number corresponding to caller's present + */ + AveragingProgressTracker(final ProgressTracker oldTracker, final long limit, final long now) { + super(oldTracker, now); + tasksOpenLimit = limit; + noDelayThreshold = limit / 2; + } + + /** + * Construct a new tracker suitable for a new task queue related to a "reconnect". + * + *

The limit is copied from the old tracker. + * + * @param oldTracker the tracker used for the previously used backend + * @param now tick number corresponding to caller's present */ - AveragingProgressTracker(final AveragingProgressTracker tracker) { - super(tracker); - this.tasksOpenLimit = tracker.tasksOpenLimit; - this.noDelayThreshold = tracker.noDelayThreshold; + AveragingProgressTracker(final AveragingProgressTracker oldTracker, final long now) { + this(oldTracker, oldTracker.tasksOpenLimit, now); } - // Public shared access (read-only) accessor-like methods + // Protected read-only methods /** * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored. @@ -89,7 +105,7 @@ final class AveragingProgressTracker extends ProgressTracker { * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again */ @Override - public long estimateIsolatedDelay(final long now) { + protected long estimateIsolatedDelay(final long now) { final long open = tasksOpen(); if (open <= noDelayThreshold) { return 0L; @@ -102,7 +118,7 @@ final class AveragingProgressTracker extends ProgressTracker { * Calculate the task capacity relative to the limit on open tasks. In real terms this value can be * in the open interval (0.0, 0.5). */ - final double relativeRemainingCapacity = 1.0 - (((double) open) / tasksOpenLimit); + final double relativeRemainingCapacity = 1.0 - (double) open / tasksOpenLimit; /* * Calculate delay coefficient. It increases in inverse proportion to relative remaining capacity, approaching diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 0d55835cd9..79d7eceb14 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -284,22 +284,22 @@ public abstract class ClientActorBehavior extends @GuardedBy("connectionsLock") @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection newConn); - private void backendConnectFinished(final Long shard, final AbstractClientConnection conn, + private void backendConnectFinished(final Long shard, final AbstractClientConnection oldConn, final T backend, final Throwable failure) { if (failure != null) { if (failure instanceof TimeoutException) { - if (!conn.equals(connections.get(shard))) { + if (!oldConn.equals(connections.get(shard))) { // AbstractClientConnection will remove itself when it decides there is no point in continuing, // at which point we want to stop retrying - LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn, - failure); + LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, + oldConn, failure); return; } LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard, RESOLVE_RETRY_DURATION, failure); context().executeInActor(b -> { - resolveConnection(shard, conn); + resolveConnection(shard, oldConn); return b; }, RESOLVE_RETRY_DURATION); return; @@ -313,7 +313,7 @@ public abstract class ClientActorBehavior extends cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure); } - conn.poison(cause); + oldConn.poison(cause); return; } @@ -323,29 +323,31 @@ public abstract class ClientActorBehavior extends final Stopwatch sw = Stopwatch.createStarted(); // Create a new connected connection - final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), - conn.cookie(), backend); - LOG.info("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + final ConnectedClientConnection newConn = new ConnectedClientConnection<>(oldConn, backend); + LOG.info("{}: resolving connection {} to {}", persistenceId(), oldConn, newConn); // Start reconnecting without the old connection lock held final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn)); // Lock the old connection and get a reference to its entries - final Collection replayIterable = conn.startReplay(); + final Collection replayIterable = oldConn.startReplay(); // Finish the connection attempt final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable)); + // Cancel sleep debt after entries were replayed, before new connection starts receiving. + newConn.cancelDebt(); + // Install the forwarder, unlocking the old connection - conn.finishReplay(forwarder); + oldConn.finishReplay(forwarder); // Make sure new lookups pick up the new connection - if (!connections.replace(shard, conn, newConn)) { - final AbstractClientConnection existing = connections.get(conn.cookie()); + if (!connections.replace(shard, oldConn, newConn)) { + final AbstractClientConnection existing = connections.get(oldConn.cookie()); LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo", - persistenceId(), conn, existing, newConn); + persistenceId(), oldConn, existing, newConn); } else { - LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), conn, newConn, sw); + LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), oldConn, newConn, sw); } } finally { connectionsLock.unlockWrite(stamp); diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java index 0afc7acf49..c540142157 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java @@ -14,8 +14,9 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; @Beta @NotThreadSafe public final class ConnectedClientConnection extends AbstractReceivingClientConnection { - ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) { - super(context, cookie, backend); + + ConnectedClientConnection(final AbstractClientConnection oldConnection, final T newBackend) { + super(oldConnection, newBackend); } @Override diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java index 07ef769544..cae6981ee3 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java @@ -22,7 +22,7 @@ public final class ConnectingClientConnection extends Abs // Initial state, never instantiated externally ConnectingClientConnection(final ClientActorContext context, final Long cookie) { - super(context, cookie, new TransmitQueue.Halted(TARGET_QUEUE_DEPTH)); + super(context, cookie, TARGET_QUEUE_DEPTH); } @Override diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java index 027b35d873..15bfa569a2 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java @@ -99,37 +99,43 @@ abstract class ProgressTracker { } /** - * Construct a copy of an existing tracker, all future tracking is fully independent. + * Construct a new tracker suitable for a new task queue related to a "reconnect". * - * @param tracker the instance to copy state from - */ - ProgressTracker(final ProgressTracker tracker) { - this.defaultTicksPerTask = tracker.defaultTicksPerTask; - this.tasksClosed = tracker.tasksClosed; - this.tasksEncountered = tracker.tasksEncountered; - this.lastClosed = tracker.lastClosed; - this.lastIdle = tracker.lastIdle; - this.nearestAllowed = tracker.nearestAllowed; - this.elapsedBeforeIdle = tracker.elapsedBeforeIdle; - } - - // Public shared access (read-only) accessor-like methods - - /** - * Get the value of default ticks per task this instance was created to use. + *

When reconnecting to a new backend, tasks may need to be re-processed by the frontend, + * possibly resulting in a different number of tasks. + * Also, performance of the new backend can be different, but the perforance of the previous backend + * is generally still better estimate than defaults of a brand new tracker. * - * @return default ticks per task value + *

This "inheritance constructor" creates a new tracker with no open tasks (thus initially idle), + * but other internal values should lead to a balanced performance + * after tasks opened in the source tracker are "replayed" into the new tracker. + * + *

In particular, this impementation keeps the number of closed tasks the same, + * and makes it so ticksWorkedPerClosedTask is initially the same as in the old tracker. + * + * @param oldTracker the tracker used for the previously used backend + * @param now tick number corresponding to caller's present */ - public final long defaultTicksPerTask() { - return defaultTicksPerTask; + ProgressTracker(final ProgressTracker oldTracker, final long now) { + this.defaultTicksPerTask = oldTracker.defaultTicksPerTask; + this.tasksEncountered = this.tasksClosed = oldTracker.tasksClosed; + this.lastClosed = oldTracker.lastClosed; + this.nearestAllowed = oldTracker.nearestAllowed; // Call cancelDebt explicitly if needed. + this.lastIdle = oldTracker.lastIdle; + this.elapsedBeforeIdle = oldTracker.elapsedBeforeIdle; + if (!oldTracker.isIdle()) { + transitToIdle(now); + } } + // "Public" shared access (read-only) accessor-like methods + /** * Get number of tasks closed so far. * * @return number of tasks known to be finished already; the value never decreases */ - public final long tasksClosed() { + final long tasksClosed() { return tasksClosed; } @@ -138,7 +144,7 @@ abstract class ProgressTracker { * * @return number of tasks encountered so far, open or finished; the value never decreases */ - public final long tasksEncountered() { + final long tasksEncountered() { return tasksEncountered; } @@ -147,7 +153,7 @@ abstract class ProgressTracker { * * @return number of tasks started but not finished yet */ - public final long tasksOpen() { + final long tasksOpen() { // TODO: Should we return int? // TODO: Should we check the return value is non-negative? return tasksEncountered - tasksClosed; } @@ -157,7 +163,7 @@ abstract class ProgressTracker { * * @return {@code true} if every encountered task is already closed, {@code false} otherwise */ - public boolean isIdle() { + final boolean isIdle() { return tasksClosed >= tasksEncountered; } @@ -167,18 +173,19 @@ abstract class ProgressTracker { * @param now tick number corresponding to caller's present * @return number of ticks backend is neither idle nor responding */ - public long ticksStalling(final long now) { + final long ticksStalling(final long now) { return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed; } + // Read only protected methods. + /** - * Number of ticks elapsed (before now) while there was at least one open task. + * Get the value of default ticks per task this instance was created to use. * - * @param now tick number corresponding to caller's present - * @return number of ticks there was at least one task open + * @return default ticks per task value */ - public long ticksWorked(final long now) { - return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle; + protected final long defaultTicksPerTask() { + return defaultTicksPerTask; } /** @@ -187,25 +194,23 @@ abstract class ProgressTracker { * @param now tick number corresponding to caller's present * @return total ticks worked divided by closed tasks, or the default value if no closed tasks */ - public double ticksWorkedPerClosedTask(final long now) { + protected final double ticksWorkedPerClosedTask(final long now) { if (tasksClosed < 1) { return defaultTicksPerTask; } return (double) ticksWorked(now) / tasksClosed; } + // Read only private methods. + /** - * Give an estimate of openTask() return value. - * - *

When the returned delay is positive, the caller thread should wait that time before opening additional task. - * - *

This method in general takes into account previously assigned delays to avoid overlaps. + * Number of ticks elapsed (before now) while there was at least one open task. * * @param now tick number corresponding to caller's present - * @return delay (in ticks) after which another openTask() is fair to be called by the same thread again + * @return number of ticks there was at least one task open */ - public long estimateDelay(final long now) { - return estimateAllowed(now) - now; + private long ticksWorked(final long now) { + return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle; } /** @@ -218,11 +223,11 @@ abstract class ProgressTracker { * @param now tick number corresponding to caller's present * @return estimated tick number when all threads with opened tasks are done waiting */ - public long estimateAllowed(final long now) { + private long estimateAllowed(final long now) { return Math.max(now, nearestAllowed + estimateIsolatedDelay(now)); } - // State-altering public methods. + // State-altering "public" methods. /** * Track a task is being closed. @@ -232,11 +237,11 @@ abstract class ProgressTracker { * @param transmitTicks see TransitQueue#recordCompletion * @param execNanos see TransitQueue#recordCompletion */ - public void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) { + final void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) { if (isIdle()) { LOG.info("Attempted to close a task while no tasks are open"); } else { - protectedCloseTask(now, enqueuedTicks, transmitTicks, execNanos); + unsafeCloseTask(now, enqueuedTicks, transmitTicks, execNanos); } } @@ -246,13 +251,27 @@ abstract class ProgressTracker { * @param now tick number corresponding to caller's present * @return number of ticks (nanos) the caller thread should wait before opening another task */ - public long openTask(final long now) { - protectedOpenTask(now); + final long openTask(final long now) { + openTaskWithoutThrottle(now); return reserveDelay(now); } - // Internal state-altering methods. Protected instead of private, - // allowing subclasses to weaken ad-hoc invariants of current implementation. + /** + * Set nearestAllowed value to now. + * + *

This is useful when new a backend has just connected, + * after a period of no working backend present. + * The accumulated delays should not limit future tasks. + * The queue fullness and the averaged backend performance are kept, + * as they should result in good enough estimations for new tasks. + * + * @param now tick number corresponding to caller's present + */ + final void cancelDebt(final long now) { + nearestAllowed = now; + } + + // Private state-altering methods. /** * Compute the next delay and update nearestAllowed value accordingly. @@ -260,7 +279,7 @@ abstract class ProgressTracker { * @param now tick number corresponding to caller's present * @return number of ticks (nanos) the caller thread should wait before opening another task */ - protected long reserveDelay(final long now) { + private long reserveDelay(final long now) { nearestAllowed = estimateAllowed(now); return nearestAllowed - now; } @@ -276,12 +295,12 @@ abstract class ProgressTracker { * @param transmitTicks see TransmitQueue#recordCompletion * @param execNanos see TransmitQueue#recordCompletion */ - protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks, + private void unsafeCloseTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) { tasksClosed++; lastClosed = now; if (isIdle()) { - elapsedBeforeIdle += now - lastIdle; + transitToIdle(now); } } @@ -293,18 +312,34 @@ abstract class ProgressTracker { * * @param now tick number corresponding to caller's present */ - protected void protectedOpenTask(final long now) { + private void openTaskWithoutThrottle(final long now) { if (isIdle()) { - lastIdle = Math.max(now, lastIdle); + transitFromIdle(now); } tasksEncountered++; } + /** + * Update lastIdle as a new "last" just hapened. + */ + private void transitFromIdle(final long now) { + lastIdle = Math.max(now, lastIdle); + } + + /** + * Update elapsedBeforeIdle as the "before" has jast moved. + */ + private void transitToIdle(final long now) { + elapsedBeforeIdle += Math.max(0, now - lastIdle); + } + + // Protected abstract read-only methods. + /** * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored. * * @param now tick number corresponding to caller's present * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again */ - abstract long estimateIsolatedDelay(long now); + protected abstract long estimateIsolatedDelay(long now); } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index 6f283f549c..24264eeedf 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -54,10 +54,16 @@ import org.slf4j.LoggerFactory; @NotThreadSafe abstract class TransmitQueue { static final class Halted extends TransmitQueue { + // For ConnectingClientConnection. Halted(final int targetDepth) { super(targetDepth); } + // For ReconnectingClientConnection. + Halted(final TransmitQueue oldQueue, final long now) { + super(oldQueue, now); + } + @Override int canTransmitCount(final int inflightSize) { return 0; @@ -73,8 +79,9 @@ abstract class TransmitQueue { private final BackendInfo backend; private long nextTxSequence; - Transmitting(final int targetDepth, final BackendInfo backend) { - super(targetDepth); + // For ConnectedClientConnection. + Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) { + super(oldQueue, targetDepth, now); this.backend = Preconditions.checkNotNull(backend); } @@ -99,13 +106,37 @@ abstract class TransmitQueue { private final Deque inflight = new ArrayDeque<>(); private final Deque pending = new ArrayDeque<>(); - private final ProgressTracker tracker; + private final AveragingProgressTracker tracker; // Cannot be just ProgressTracker as we are inheriting limits. private ReconnectForwarder successor; + /** + * Construct initial transmitting queue. + */ TransmitQueue(final int targetDepth) { tracker = new AveragingProgressTracker(targetDepth); } + /** + * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance. + */ + TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) { + tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now); + } + + /** + * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance. + */ + TransmitQueue(final TransmitQueue oldQueue, final long now) { + tracker = new AveragingProgressTracker(oldQueue.tracker, now); + } + + /** + * Cancel the accumulated sum of delays as we expect the new backend to work now. + */ + void cancelDebt(final long now) { + tracker.cancelDebt(now); + } + /** * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries * to be added to it during replay. When we set the successor all entries enqueued between when this methods diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java index 77d5938bc9..e31fc55917 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java @@ -44,7 +44,7 @@ public class AccessClientUtil { public static ConnectedClientConnection createConnectedConnection( final ClientActorContext context, final Long cookie, final T backend) { - return new ConnectedClientConnection<>(context, cookie, backend); + return new ConnectedClientConnection<>(new ConnectingClientConnection<>(context, cookie), backend); } public static void completeRequest(final AbstractClientConnection connection, diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTrackerTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTrackerTest.java index 9ed77fedbe..0e0089977d 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTrackerTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTrackerTest.java @@ -8,15 +8,12 @@ package org.opendaylight.controller.cluster.access.client; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import com.google.common.testing.FakeTicker; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; -import org.mockito.internal.matchers.apachecommons.ReflectionEquals; public class AveragingProgressTrackerTest { private static final long CHECKER = TimeUnit.MILLISECONDS.toNanos(500); @@ -66,24 +63,4 @@ public class AveragingProgressTrackerTest { assertEquals(0, averagingProgressTracker.estimateIsolatedDelay(ticker.read())); } - @Test - public void copyObjectTest() { - final AveragingProgressTracker copyAverageProgressTracker = new AveragingProgressTracker( - averagingProgressTracker); - - // copied object is the same as original - assertTrue(new ReflectionEquals(averagingProgressTracker).matches(copyAverageProgressTracker)); - - // afterwards work of copied tracker is independent - averagingProgressTracker.openTask(ticker.read()); - - final long time = ticker.read(); - assertNotEquals("Trackers are expected to return different results for tracking", - averagingProgressTracker.openTask(time), copyAverageProgressTracker.openTask(time)); - assertNotEquals("Trackers are expected to encounter different amount of tasks", - averagingProgressTracker.tasksEncountered(), copyAverageProgressTracker.tasksEncountered()); - - // and copied object is then no more the same as original - assertFalse(new ReflectionEquals(averagingProgressTracker).matches(copyAverageProgressTracker)); - } } \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java index 0f30806239..e6a09c1757 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java @@ -35,7 +35,8 @@ public class ConnectedClientConnectionTest @Override protected ConnectedClientConnection createConnection() { final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10); - return new ConnectedClientConnection<>(context, 0L, backend); + final ConnectingClientConnection connectingConn = new ConnectingClientConnection<>(context, 0L); + return new ConnectedClientConnection<>(connectingConn, backend); } @Override diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java index 7dd609ec27..b83c810aad 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java @@ -356,10 +356,12 @@ public class ConnectingClientConnectionTest { } private void setupBackend() { - final ConnectedClientConnection newConn = new ConnectedClientConnection<>(mockContext, mockCookie, - mockBackendInfo); - queue.setForwarder(new SimpleReconnectForwarder(newConn)); - queue = newConn; + final ConnectingClientConnection connectingConn = + new ConnectingClientConnection<>(mockContext, mockCookie); + final ConnectedClientConnection connectedConn = + new ConnectedClientConnection<>(connectingConn, mockBackendInfo); + queue.setForwarder(new SimpleReconnectForwarder(connectedConn)); + queue = connectedConn; } private void assertTransmit(final Request expected, final long sequence) { diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnectionTest.java index 0805f56dd4..679430cb71 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnectionTest.java @@ -43,10 +43,10 @@ public class ReconnectingClientConnectionTest @Override protected ReconnectingClientConnection createConnection() { final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10); - - final ConnectedClientConnection oldConnection = - new ConnectedClientConnection<>(context, 0L, backend); - return new ReconnectingClientConnection<>(oldConnection, mock(RequestException.class)); + final ConnectingClientConnection connectingConn = new ConnectingClientConnection<>(context, 0L); + final ConnectedClientConnection connectedConn = + new ConnectedClientConnection<>(connectingConn, backend); + return new ReconnectingClientConnection<>(connectedConn, mock(RequestException.class)); } @Override diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java index 6dd38ce213..11fc421903 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java @@ -43,6 +43,10 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref()); final Consumer> callback1 = createConsumerMock(); final Consumer> callback2 = createConsumerMock(); - final long now1 = Ticker.systemTicker().read(); - final long now2 = Ticker.systemTicker().read(); + final long now1 = now(); + final long now2 = now(); //enqueue 2 entries queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1); queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2); @@ -91,7 +95,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); - final long now = Ticker.systemTicker().read(); + final long now = now(); queue.enqueue(new ConnectionEntry(request, callback, now), now); final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); assertEquals(request, requestEnvelope.getMessage()); @@ -101,7 +105,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); - final long now = Ticker.systemTicker().read(); + final long now = now(); final int sentMessages = getMaxInFlightMessages() + 1; for (int i = 0; i < sentMessages; i++) { queue.enqueue(new ConnectionEntry(request, callback, now), now); @@ -127,7 +131,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); - final long now = Ticker.systemTicker().read(); + final long now = now(); final ConnectionEntry entry = new ConnectionEntry(request, callback, now); queue.transmit(entry, now); final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); -- 2.36.6