/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
}
/**
- * Construct a copy of an existing tracker, all future tracking is fully independent.
+ * Construct a new tracker suitable for a new task queue related to a "reconnect".
*
- * @param tracker the instance to copy state from
- */
- ProgressTracker(final ProgressTracker tracker) {
- this.defaultTicksPerTask = tracker.defaultTicksPerTask;
- this.tasksClosed = tracker.tasksClosed;
- this.tasksEncountered = tracker.tasksEncountered;
- this.lastClosed = tracker.lastClosed;
- this.lastIdle = tracker.lastIdle;
- this.nearestAllowed = tracker.nearestAllowed;
- this.elapsedBeforeIdle = tracker.elapsedBeforeIdle;
- }
-
- // Public shared access (read-only) accessor-like methods
-
- /**
- * Get the value of default ticks per task this instance was created to use.
+ * <p>When reconnecting to a new backend, tasks may need to be re-processed by the frontend,
+ * possibly resulting in a different number of tasks.
+ * Also, performance of the new backend can be different, but the perforance of the previous backend
+ * is generally still better estimate than defaults of a brand new tracker.
*
- * @return default ticks per task value
+ * <p>This "inheritance constructor" creates a new tracker with no open tasks (thus initially idle),
+ * but other internal values should lead to a balanced performance
+ * after tasks opened in the source tracker are "replayed" into the new tracker.
+ *
+ * <p>In particular, this impementation keeps the number of closed tasks the same,
+ * and makes it so ticksWorkedPerClosedTask is initially the same as in the old tracker.
+ *
+ * @param oldTracker the tracker used for the previously used backend
+ * @param now tick number corresponding to caller's present
*/
- public final long defaultTicksPerTask() {
- return defaultTicksPerTask;
+ ProgressTracker(final ProgressTracker oldTracker, final long now) {
+ this.defaultTicksPerTask = oldTracker.defaultTicksPerTask;
+ this.tasksEncountered = this.tasksClosed = oldTracker.tasksClosed;
+ this.lastClosed = oldTracker.lastClosed;
+ this.nearestAllowed = oldTracker.nearestAllowed; // Call cancelDebt explicitly if needed.
+ this.lastIdle = oldTracker.lastIdle;
+ this.elapsedBeforeIdle = oldTracker.elapsedBeforeIdle;
+ if (!oldTracker.isIdle()) {
+ transitToIdle(now);
+ }
}
+ // "Public" shared access (read-only) accessor-like methods
+
/**
* Get number of tasks closed so far.
*
* @return number of tasks known to be finished already; the value never decreases
*/
- public final long tasksClosed() {
+ final long tasksClosed() {
return tasksClosed;
}
*
* @return number of tasks encountered so far, open or finished; the value never decreases
*/
- public final long tasksEncountered() {
+ final long tasksEncountered() {
return tasksEncountered;
}
*
* @return number of tasks started but not finished yet
*/
- public final long tasksOpen() {
+ final long tasksOpen() { // TODO: Should we return int?
// TODO: Should we check the return value is non-negative?
return tasksEncountered - tasksClosed;
}
*
* @return {@code true} if every encountered task is already closed, {@code false} otherwise
*/
- public boolean isIdle() {
+ final boolean isIdle() {
return tasksClosed >= tasksEncountered;
}
* @param now tick number corresponding to caller's present
* @return number of ticks backend is neither idle nor responding
*/
- public long ticksStalling(final long now) {
+ final long ticksStalling(final long now) {
return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed;
}
+ // Read only protected methods.
+
/**
- * Number of ticks elapsed (before now) while there was at least one open task.
+ * Get the value of default ticks per task this instance was created to use.
*
- * @param now tick number corresponding to caller's present
- * @return number of ticks there was at least one task open
+ * @return default ticks per task value
*/
- public long ticksWorked(final long now) {
- return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
+ protected final long defaultTicksPerTask() {
+ return defaultTicksPerTask;
}
/**
* @param now tick number corresponding to caller's present
* @return total ticks worked divided by closed tasks, or the default value if no closed tasks
*/
- public double ticksWorkedPerClosedTask(final long now) {
+ protected final double ticksWorkedPerClosedTask(final long now) {
if (tasksClosed < 1) {
return defaultTicksPerTask;
}
return (double) ticksWorked(now) / tasksClosed;
}
+ // Read only private methods.
+
/**
- * Give an estimate of openTask() return value.
- *
- * <p>When the returned delay is positive, the caller thread should wait that time before opening additional task.
- *
- * <p>This method in general takes into account previously assigned delays to avoid overlaps.
+ * Number of ticks elapsed (before now) while there was at least one open task.
*
* @param now tick number corresponding to caller's present
- * @return delay (in ticks) after which another openTask() is fair to be called by the same thread again
+ * @return number of ticks there was at least one task open
*/
- public long estimateDelay(final long now) {
- return estimateAllowed(now) - now;
+ private long ticksWorked(final long now) {
+ return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
}
/**
* @param now tick number corresponding to caller's present
* @return estimated tick number when all threads with opened tasks are done waiting
*/
- public long estimateAllowed(final long now) {
- return Math.max(now, nearestAllowed) + estimateIsolatedDelay(now);
+ private long estimateAllowed(final long now) {
+ return Math.max(now, nearestAllowed + estimateIsolatedDelay(now));
}
- // State-altering public methods.
+ // State-altering "public" methods.
/**
* Track a task is being closed.
* @param transmitTicks see TransitQueue#recordCompletion
* @param execNanos see TransitQueue#recordCompletion
*/
- public void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
+ final void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
if (isIdle()) {
LOG.info("Attempted to close a task while no tasks are open");
} else {
- protectedCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
+ unsafeCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
}
}
* @param now tick number corresponding to caller's present
* @return number of ticks (nanos) the caller thread should wait before opening another task
*/
- public long openTask(final long now) {
- protectedOpenTask(now);
+ final long openTask(final long now) {
+ openTaskWithoutThrottle(now);
return reserveDelay(now);
}
- // Internal state-altering methods. Protected instead of private,
- // allowing subclasses to weaken ad-hoc invariants of current implementation.
+ /**
+ * Set nearestAllowed value to now.
+ *
+ * <p>This is useful when new a backend has just connected,
+ * after a period of no working backend present.
+ * The accumulated delays should not limit future tasks.
+ * The queue fullness and the averaged backend performance are kept,
+ * as they should result in good enough estimations for new tasks.
+ *
+ * @param now tick number corresponding to caller's present
+ */
+ final void cancelDebt(final long now) {
+ nearestAllowed = now;
+ }
+
+ // Private state-altering methods.
/**
* Compute the next delay and update nearestAllowed value accordingly.
* @param now tick number corresponding to caller's present
* @return number of ticks (nanos) the caller thread should wait before opening another task
*/
- protected long reserveDelay(final long now) {
+ private long reserveDelay(final long now) {
nearestAllowed = estimateAllowed(now);
return nearestAllowed - now;
}
* This call can empty the collection of open tasks, that special case should be handled.
*
* @param now tick number corresponding to caller's present
- * @param enqueuedTicks see TransitQueue#recordCompletion
- * @param transmitTicks see TransitQueue#recordCompletion
- * @param execNanos see TransitQueue#recordCompletion
+ * @param enqueuedTicks see TransmitQueue#recordCompletion
+ * @param transmitTicks see TransmitQueue#recordCompletion
+ * @param execNanos see TransmitQueue#recordCompletion
*/
- protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
+ private void unsafeCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
final long execNanos) {
tasksClosed++;
lastClosed = now;
if (isIdle()) {
- elapsedBeforeIdle += now - lastIdle;
+ transitToIdle(now);
}
}
*
* @param now tick number corresponding to caller's present
*/
- protected void protectedOpenTask(final long now) {
+ private void openTaskWithoutThrottle(final long now) {
if (isIdle()) {
- lastIdle = Math.max(now, lastIdle);
+ transitFromIdle(now);
}
tasksEncountered++;
}
+ /**
+ * Update lastIdle as a new "last" just hapened.
+ */
+ private void transitFromIdle(final long now) {
+ lastIdle = Math.max(now, lastIdle);
+ }
+
+ /**
+ * Update elapsedBeforeIdle as the "before" has jast moved.
+ */
+ private void transitToIdle(final long now) {
+ elapsedBeforeIdle += Math.max(0, now - lastIdle);
+ }
+
+ // Protected abstract read-only methods.
+
/**
* Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
*
* @param now tick number corresponding to caller's present
* @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
*/
- abstract long estimateIsolatedDelay(final long now);
+ protected abstract long estimateIsolatedDelay(long now);
}