Bump versions 9.0.4-SNAPSHOT
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ProgressTracker.java
index 027b35d873f864457588631a426c7350029f4caa..677a57e770f6a3dd802d5ef22cddee268c58b0ec 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import com.google.common.base.Preconditions;
-import javax.annotation.concurrent.NotThreadSafe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -17,37 +16,39 @@ import org.slf4j.LoggerFactory;
  * Base class for tracking throughput and computing delays when processing stream of tasks.
  *
  * <p>The idea is to improve throughput in a typical request-response scenario.
- * A "frontend" is sending requests towards "backend", backend is sending responses back to fronted.
- * Both frontend and backend may be realized by multiple Java threads,
- * so there may be multiple requests not yet responded to.
- * In terms of taks processing, frontend is "opening" tasks and backend is "closing" them.
+ * Multiple "user" threads are submitting requests to a "frontend". The frontend does some
+ * pre-processing and then sends requests to (usually one) "backend". The backend does the main work
+ * and replies to the frontend, which reports to the the corresponding user.
+ * In terms of task processing, user threads are "opening" tasks and frontend is "closing" them.
  * Latency of the backend may fluctuate wildly. To avoid backend running out of open tasks,
- * there should be a queue of requests frontend can add to.
+ * frontend should maintain a queue of requests for users to submit tasks to.
  * In order to avoid excessive memory consumption, there should be a back-pressure mechanism
- * which blocks the frontend threads for appropriate durations.
- * Frontend can tolerate moderately delayed responses, but it only tolerates small block times.
+ * which blocks the user (submit) threads for appropriate durations.
+ * Users can tolerate moderately delayed responses, but they only tolerate small block (submit)
+ * times.
  *
  * <p>An ideal back-pressure algorithm would keep the queue reasonably full,
- * while fairly delaying the frontend threads. In other words, backend idle time should be low,
- * as well as frontend block time dispersion
+ * while fairly delaying the user threads. In other words, backend idle time should be low,
+ * as well as user block time dispersion
  * (as opposed to block time average, which is dictated by overall performance).
  *
  * <p>In order for an algorithm to compute reasonable wait times,
  * various inputs can be useful, mostly related to timing of various stages of task processing.
- * Methods of this class assume "enqueue and wait" usage.
- * The delay computation is pessimistic, it expects each participating thread to enqueue another task
- * as soon as its delay time allows.
+ * Methods of this class assume "enqueue and wait" usage, submit thread is supposed to block itself
+ * when asked to. The delay computation is pessimistic, it expects each participating thread
+ * to enqueue another task as soon as its delay time allows.
  *
- * <p>This class is not thread safe, the callers are responsible for guarding against conflicting access.
- * Time is measured in ticks (nanos), methods never look at current time, relying on {@code now} argument instead.
- * This means the sequence of {$code now} argument values is expected to be non-decreasing.
+ * <p>This class is to be used by single frontend. This class is not thread safe,
+ * the frontend is responsible for guarding against conflicting access.
+ * Time is measured in ticks (nanoseconds), methods never look at current time,
+ * relying on {@code now} argument where appropriate.
+ * This means the sequence of {@code now} argument values is expected to be non-decreasing.
  *
  * <p>Input data used for tracking is tightly coupled with TransitQueue#recordCompletion arguments.
  *
  * @author Vratko Polak
  */
 // TODO: Would bulk methods be less taxing than a loop of single task calls?
-@NotThreadSafe
 abstract class ProgressTracker {
     private static final Logger LOG = LoggerFactory.getLogger(ProgressTracker.class);
 
@@ -99,37 +100,44 @@ abstract class ProgressTracker {
     }
 
     /**
-     * Construct a copy of an existing tracker, all future tracking is fully independent.
+     * Construct a new tracker suitable for a new task queue related to a "reconnect".
      *
-     * @param tracker the instance to copy state from
-     */
-    ProgressTracker(final ProgressTracker tracker) {
-        this.defaultTicksPerTask = tracker.defaultTicksPerTask;
-        this.tasksClosed = tracker.tasksClosed;
-        this.tasksEncountered = tracker.tasksEncountered;
-        this.lastClosed = tracker.lastClosed;
-        this.lastIdle = tracker.lastIdle;
-        this.nearestAllowed = tracker.nearestAllowed;
-        this.elapsedBeforeIdle = tracker.elapsedBeforeIdle;
-    }
-
-    // Public shared access (read-only) accessor-like methods
-
-    /**
-     * Get the value of default ticks per task this instance was created to use.
+     * <p>When reconnecting to a new backend, tasks may need to be re-processed by the frontend,
+     * possibly resulting in a different number of tasks.
+     * Also, performance of the new backend can be different, but the perforance of the previous backend
+     * is generally still better estimate than defaults of a brand new tracker.
      *
-     * @return default ticks per task value
+     * <p>This "inheritance constructor" creates a new tracker with no open tasks (thus initially idle),
+     * but other internal values should lead to a balanced performance
+     * after tasks opened in the source tracker are "replayed" into the new tracker.
+     *
+     * <p>In particular, this impementation keeps the number of closed tasks the same,
+     * and makes it so ticksWorkedPerClosedTask is initially the same as in the old tracker.
+     *
+     * @param oldTracker the tracker used for the previously used backend
+     * @param now tick number corresponding to caller's present
      */
-    public final long defaultTicksPerTask() {
-        return defaultTicksPerTask;
+    ProgressTracker(final ProgressTracker oldTracker, final long now) {
+        defaultTicksPerTask = oldTracker.defaultTicksPerTask;
+        tasksEncountered = tasksClosed = oldTracker.tasksClosed;
+        lastClosed = oldTracker.lastClosed;
+        // Call cancelDebt explicitly if needed.
+        nearestAllowed = oldTracker.nearestAllowed;
+        lastIdle = oldTracker.lastIdle;
+        elapsedBeforeIdle = oldTracker.elapsedBeforeIdle;
+        if (!oldTracker.isIdle()) {
+            transitToIdle(now);
+        }
     }
 
+    // "Public" shared access (read-only) accessor-like methods
+
     /**
      * Get number of tasks closed so far.
      *
      * @return number of tasks known to be finished already; the value never decreases
      */
-    public final long tasksClosed() {
+    final long tasksClosed() {
         return tasksClosed;
     }
 
@@ -138,7 +146,7 @@ abstract class ProgressTracker {
      *
      * @return number of tasks encountered so far, open or finished; the value never decreases
      */
-    public final long tasksEncountered() {
+    final long tasksEncountered() {
         return tasksEncountered;
     }
 
@@ -147,7 +155,8 @@ abstract class ProgressTracker {
      *
      * @return number of tasks started but not finished yet
      */
-    public final long tasksOpen() {
+    // TODO: Should we return int?
+    final long tasksOpen() {
         // TODO: Should we check the return value is non-negative?
         return tasksEncountered - tasksClosed;
     }
@@ -157,7 +166,7 @@ abstract class ProgressTracker {
      *
      * @return {@code true} if every encountered task is already closed, {@code false} otherwise
      */
-    public boolean isIdle() {
+    final boolean isIdle() {
         return tasksClosed >= tasksEncountered;
     }
 
@@ -167,18 +176,19 @@ abstract class ProgressTracker {
      * @param now tick number corresponding to caller's present
      * @return number of ticks backend is neither idle nor responding
      */
-    public long ticksStalling(final long now) {
+    final long ticksStalling(final long now) {
         return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed;
     }
 
+    // Read only protected methods.
+
     /**
-     * Number of ticks elapsed (before now) while there was at least one open task.
+     * Get the value of default ticks per task this instance was created to use.
      *
-     * @param now tick number corresponding to caller's present
-     * @return number of ticks there was at least one task open
+     * @return default ticks per task value
      */
-    public long ticksWorked(final long now) {
-        return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
+    protected final long defaultTicksPerTask() {
+        return defaultTicksPerTask;
     }
 
     /**
@@ -187,25 +197,23 @@ abstract class ProgressTracker {
      * @param now tick number corresponding to caller's present
      * @return total ticks worked divided by closed tasks, or the default value if no closed tasks
      */
-    public double ticksWorkedPerClosedTask(final long now) {
+    protected final double ticksWorkedPerClosedTask(final long now) {
         if (tasksClosed < 1) {
             return defaultTicksPerTask;
         }
         return (double) ticksWorked(now) / tasksClosed;
     }
 
+    // Read only private methods.
+
     /**
-     * Give an estimate of openTask() return value.
-     *
-     * <p>When the returned delay is positive, the caller thread should wait that time before opening additional task.
-     *
-     * <p>This method in general takes into account previously assigned delays to avoid overlaps.
+     * Number of ticks elapsed (before now) while there was at least one open task.
      *
      * @param now tick number corresponding to caller's present
-     * @return delay (in ticks) after which another openTask() is fair to be called by the same thread again
+     * @return number of ticks there was at least one task open
      */
-    public long estimateDelay(final long now) {
-        return estimateAllowed(now) - now;
+    private long ticksWorked(final long now) {
+        return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
     }
 
     /**
@@ -218,11 +226,11 @@ abstract class ProgressTracker {
      * @param now tick number corresponding to caller's present
      * @return estimated tick number when all threads with opened tasks are done waiting
      */
-    public long estimateAllowed(final long now) {
+    private long estimateAllowed(final long now) {
         return Math.max(now, nearestAllowed + estimateIsolatedDelay(now));
     }
 
-    // State-altering public methods.
+    // State-altering "public" methods.
 
     /**
      * Track a task is being closed.
@@ -232,11 +240,11 @@ abstract class ProgressTracker {
      * @param transmitTicks see TransitQueue#recordCompletion
      * @param execNanos see TransitQueue#recordCompletion
      */
-    public void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
+    final void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
         if (isIdle()) {
             LOG.info("Attempted to close a task while no tasks are open");
         } else {
-            protectedCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
+            unsafeCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
         }
     }
 
@@ -246,13 +254,27 @@ abstract class ProgressTracker {
      * @param now tick number corresponding to caller's present
      * @return number of ticks (nanos) the caller thread should wait before opening another task
      */
-    public long openTask(final long now) {
-        protectedOpenTask(now);
+    final long openTask(final long now) {
+        openTaskWithoutThrottle(now);
         return reserveDelay(now);
     }
 
-    // Internal state-altering methods. Protected instead of private,
-    // allowing subclasses to weaken ad-hoc invariants of current implementation.
+    /**
+     * Set nearestAllowed value to now.
+     *
+     * <p>This is useful when new a backend has just connected,
+     * after a period of no working backend present.
+     * The accumulated delays should not limit future tasks.
+     * The queue fullness and the averaged backend performance are kept,
+     * as they should result in good enough estimations for new tasks.
+     *
+     * @param now tick number corresponding to caller's present
+     */
+    final void cancelDebt(final long now) {
+        nearestAllowed = now;
+    }
+
+    // Private state-altering methods.
 
     /**
      * Compute the next delay and update nearestAllowed value accordingly.
@@ -260,7 +282,7 @@ abstract class ProgressTracker {
      * @param now tick number corresponding to caller's present
      * @return number of ticks (nanos) the caller thread should wait before opening another task
      */
-    protected long reserveDelay(final long now) {
+    private long reserveDelay(final long now) {
         nearestAllowed = estimateAllowed(now);
         return nearestAllowed - now;
     }
@@ -276,12 +298,12 @@ abstract class ProgressTracker {
      * @param transmitTicks see TransmitQueue#recordCompletion
      * @param execNanos see TransmitQueue#recordCompletion
      */
-    protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
+    private void unsafeCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
                 final long execNanos) {
         tasksClosed++;
         lastClosed = now;
         if (isIdle()) {
-            elapsedBeforeIdle += now - lastIdle;
+            transitToIdle(now);
         }
     }
 
@@ -293,18 +315,34 @@ abstract class ProgressTracker {
      *
      * @param now tick number corresponding to caller's present
      */
-    protected void protectedOpenTask(final long now) {
+    private void openTaskWithoutThrottle(final long now) {
         if (isIdle()) {
-            lastIdle = Math.max(now, lastIdle);
+            transitFromIdle(now);
         }
         tasksEncountered++;
     }
 
+    /**
+     * Update lastIdle as a new "last" just hapened.
+     */
+    private void transitFromIdle(final long now) {
+        lastIdle = Math.max(now, lastIdle);
+    }
+
+    /**
+     * Update elapsedBeforeIdle as the "before" has jast moved.
+     */
+    private void transitToIdle(final long now) {
+        elapsedBeforeIdle += Math.max(0, now - lastIdle);
+    }
+
+    // Protected abstract read-only methods.
+
     /**
      * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
      *
      * @param now tick number corresponding to caller's present
      * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
      */
-    abstract long estimateIsolatedDelay(long now);
+    protected abstract long estimateIsolatedDelay(long now);
 }