Bug 5280: Add ProgressTracker 53/49053/18
authorVratko Polak <vrpolak@cisco.com>
Thu, 22 Dec 2016 09:39:00 +0000 (10:39 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Sat, 21 Jan 2017 18:21:52 +0000 (18:21 +0000)
ProgressTracker determines the tracking delay required
to keep the outbound queue reasonably full. Unlike Guava's
RateLimiter, we can assume cooperative callers, hence we
can use an enqueue-and-wait strategy as appropriate. This
lowers contention in multi-threaded access, as the wait
part of the cycle is spent outside the lock.

Change-Id: Iaea65c171455b89d7117431599ebc65fe0a4f19a
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java

index 7dc150e403dc283a2181aad081fb1757afc690ea..ac4ac785d6e28dcaf08de1a82a85bf2610020186 100644 (file)
@@ -50,9 +50,6 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     private final TransmitQueue queue;
     private final Long cookie;
 
-    // Updated from actor thread only
-    private long lastProgress;
-
     private volatile RequestException poisoned;
 
     // Do not allow subclassing outside of this package
@@ -61,15 +58,13 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         this.context = Preconditions.checkNotNull(context);
         this.cookie = Preconditions.checkNotNull(cookie);
         this.queue = Preconditions.checkNotNull(queue);
-        this.lastProgress = readTime();
     }
 
     // Do not allow subclassing outside of this package
-    AbstractClientConnection(final AbstractClientConnection<T> oldConnection) {
+    AbstractClientConnection(final AbstractClientConnection<T> oldConnection, final int targetQueueSize) {
         this.context = oldConnection.context;
         this.cookie = oldConnection.cookie;
-        this.lastProgress = oldConnection.lastProgress;
-        this.queue = new TransmitQueue.Halted();
+        this.queue = new TransmitQueue.Halted(targetQueueSize);
     }
 
     public final ClientActorContext context() {
@@ -88,6 +83,9 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
      * from any thread.
      *
+     * <p>This method may put the caller thread to sleep in order to throttle the request rate.
+     * The callback may be called before the sleep finishes.
+     *
      * @param request Request to send
      * @param callback Callback to invoke
      */
@@ -98,13 +96,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
 
         final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
-
-        lock.lock();
-        try {
-            queue.enqueue(entry, entry.getEnqueuedTicks());
-        } finally {
-            lock.unlock();
-        }
+        enqueueAndWait(entry, entry.getEnqueuedTicks());
     }
 
     public abstract Optional<T> getBackendInfo();
@@ -132,15 +124,24 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         return context.ticker().read();
     }
 
-    final void enqueueEntry(final ConnectionEntry entry, final long now) {
+    final long enqueueEntry(final ConnectionEntry entry, final long now) {
         lock.lock();
         try {
-            queue.enqueue(entry, now);
+            return queue.enqueue(entry, now);
         } finally {
             lock.unlock();
         }
     }
 
+    final void enqueueAndWait(final ConnectionEntry entry, final long now) {
+        final long delay = enqueueEntry(entry, now);
+        try {
+            TimeUnit.NANOSECONDS.sleep(delay);
+        } catch (InterruptedException e) {
+            LOG.debug("Interrupted while sleeping");
+        }
+    }
+
     /**
      * Schedule a timer to fire on the actor thread after a delay.
      *
@@ -165,16 +166,15 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         lock.lock();
         try {
             final long now = readTime();
-            if (!queue.isEmpty()) {
-                final long ticksSinceProgress = now - lastProgress;
-                if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
-                    LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
-                        TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
-
-                    lockedPoison(new NoProgressException(ticksSinceProgress));
-                    current.removeConnection(this);
-                    return current;
-                }
+            // The following line is only reliable when queue is not forwarding, but such state should not last long.
+            final long ticksSinceProgress = queue.ticksStalling(now);
+            if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
+                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+                    TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+
+                lockedPoison(new NoProgressException(ticksSinceProgress));
+                current.removeConnection(this);
+                return current;
             }
 
             // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
@@ -222,13 +222,13 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             return Optional.empty();
         }
 
-        final long delay = head.getEnqueuedTicks() - now + REQUEST_TIMEOUT_NANOS;
-        if (delay <= 0) {
-            LOG.debug("Connection {} timed out", this);
+        final long beenOpen = now - head.getEnqueuedTicks();
+        if (beenOpen >= REQUEST_TIMEOUT_NANOS) {
+            LOG.debug("Connection {} has a request not completed for {} nanoseconds, timing out", this, beenOpen);
             return null;
         }
 
-        return Optional.of(FiniteDuration.apply(delay, TimeUnit.NANOSECONDS));
+        return Optional.of(FiniteDuration.apply(REQUEST_TIMEOUT_NANOS - beenOpen, TimeUnit.NANOSECONDS));
     }
 
     final void poison(final RequestException cause) {
@@ -267,7 +267,5 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             LOG.debug("Completing {} with {}", entry, envelope);
             entry.complete(envelope.getMessage());
         }
-
-        lastProgress = readTime();
     }
 }
index 15da294ec99fbcd4ed49417500d55b8dae950432..8d9ed24043f41758ef461dc29fb74aefe0008373 100644 (file)
@@ -12,25 +12,42 @@ import java.util.Optional;
 
 /**
  * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
- * sublcasses. It allows us to share some code.
+ * subclasses. It allows us to share some code.
  *
  * @author Robert Varga
  *
  * @param <T> Concrete {@link BackendInfo} type
  */
 abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
+    /**
+     * Multiplication factor applied to remote's advertised limit on outstanding messages. Our default strategy
+     * rate-limiting strategy in {@link AveragingProgressTracker} does not penalize threads as long as we have not
+     * reached half of the target.
+     *
+     * <p>
+     * By multiplying the advertised maximum by four, our queue steady-state should end up with:
+     * - the backend pipeline being full,
+     * - another full batch of messages being in the queue while not paying any throttling cost
+     * - another 2 full batches of messages with incremental throttling cost
+     */
+    private static final int MESSAGE_QUEUE_FACTOR = 4;
+
     private final T backend;
 
     AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
-        super(context, cookie, new TransmitQueue.Transmitting(backend));
+        super(context, cookie, new TransmitQueue.Transmitting(targetQueueSize(backend), backend));
         this.backend = Preconditions.checkNotNull(backend);
     }
 
     AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
-        super(oldConnection);
+        super(oldConnection, targetQueueSize(oldConnection.backend));
         this.backend = oldConnection.backend;
     }
 
+    private static int targetQueueSize(final BackendInfo backend) {
+        return backend.getMaxMessages() * MESSAGE_QUEUE_FACTOR;
+    }
+
     @Override
     public final Optional<T> getBackendInfo() {
         return Optional.of(backend);
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java
new file mode 100644 (file)
index 0000000..1c6210a
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.access.client;
+
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * A ProgressTracker subclass which uses {@code ticksWorkedPerClosedTask} to compute delays.
+ *
+ * <p>This class has {@code tasksOpenLimit} used as a (weak) limit,
+ * as number of open tasks approaches that value, delays computed are increasing.
+ *
+ * <p>In order to keep delays from raising to unreasonably high values,
+ * a maximal delay (per task) value is never exceeded.
+ *
+ * <p>On the other hand, there is no delay when number of open tasks is half the limit or less,
+ * in order to prevent backend from running out of tasks while there may be waiting frontend threads.
+ *
+ * @author Vratko Polak
+ */
+@NotThreadSafe
+final class AveragingProgressTracker extends ProgressTracker {
+    private static final long DEFAULT_TICKS_PER_TASK = TimeUnit.MILLISECONDS.toNanos(500);
+
+    /**
+     * The implementation will avoid having more that this number of tasks open.
+     */
+    private final long tasksOpenLimit;
+
+    /**
+     * We do not delay tasks until their count hits this threshold.
+     */
+    private final long noDelayThreshold;
+
+    /**
+     * Create an idle tracker with limit and specified ticks per task value to use as default.
+     *
+     * @param limit of open tasks to avoid exceeding
+     * @param ticksPerTask value to use as default
+     */
+    private AveragingProgressTracker(final int limit, final long ticksPerTask) {
+        super(ticksPerTask);
+        tasksOpenLimit = limit;
+        noDelayThreshold = limit / 2;
+    }
+
+    /**
+     * Create a default idle tracker with given limit.
+     *
+     * @param limit of open tasks to avoid exceeding
+     */
+    AveragingProgressTracker(final int limit) {
+        this(limit, DEFAULT_TICKS_PER_TASK);
+    }
+
+    /**
+     * Create a copy of an existing tracker, all future tracking is fully independent.
+     *
+     * @param tracker the instance to copy state from
+     */
+    AveragingProgressTracker(final AveragingProgressTracker tracker) {
+        super(tracker);
+        this.tasksOpenLimit = tracker.tasksOpenLimit;
+        this.noDelayThreshold = tracker.noDelayThreshold;
+    }
+
+    // Public shared access (read-only) accessor-like methods
+
+    /**
+     * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
+     *
+     * <p>This implementation returns zero delay if number of open tasks is half of limit or less.
+     * Else the delay is computed, aiming to keep number of open tasks at 3/4 of limit,
+     * assuming backend throughput remains constant.
+     *
+     * <p>As the number of open tasks approaches the limit,
+     * the computed delay increases, but it never exceeds defaultTicksPerTask.
+     * That means the actual number of open tasks can exceed the limit.
+     *
+     * @param now tick number corresponding to caller's present
+     * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
+     */
+    @Override
+    public long estimateIsolatedDelay(final long now) {
+        final long open = tasksOpen();
+        if (open <= noDelayThreshold) {
+            return 0L;
+        }
+        if (open >= tasksOpenLimit) {
+            return defaultTicksPerTask();
+        }
+
+        /*
+         * Calculate the task capacity relative to the limit on open tasks. In real terms this value can be
+         * in the open interval (0.0, 0.5).
+         */
+        final double relativeRemainingCapacity = 1.0 - (((double) open) / tasksOpenLimit);
+
+        /*
+         * Calculate delay coefficient. It increases in inverse proportion to relative remaining capacity, approaching
+         * infinity as remaining capacity approaches 0.0.
+         */
+        final double delayCoefficient = (0.5 - relativeRemainingCapacity) / relativeRemainingCapacity;
+        final long delay = (long) (ticksWorkedPerClosedTask(now) * delayCoefficient);
+
+        /*
+         * Cap the result to defaultTicksPerTask, since the calculated delay may overstep it.
+         */
+        return Math.min(delay, defaultTicksPerTask());
+    }
+}
index 64867e1c0e22be7b6cbba40e7f894d6993f2d1ce..aa986873d57e03f841e27986d24f57fa5e9cf402 100644 (file)
@@ -12,9 +12,16 @@ import java.util.Optional;
 
 @Beta
 public final class ConnectingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
+    /**
+     * A wild estimate on how deep a queue should be. Without having knowledge of the remote actor we can only
+     * guess its processing capabilities while we are doing initial buffering. With {@link AveragingProgressTracker}
+     * this boils down to a burst of up to 2000 messages before we start throttling.
+     */
+    private static final int TARGET_QUEUE_DEPTH = 4000;
+
     // Initial state, never instantiated externally
     ConnectingClientConnection(final ClientActorContext context, final Long cookie) {
-        super(context, cookie, new TransmitQueue.Halted());
+        super(context, cookie, new TransmitQueue.Halted(TARGET_QUEUE_DEPTH));
     }
 
     @Override
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java
new file mode 100644 (file)
index 0000000..024141a
--- /dev/null
@@ -0,0 +1,308 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.access.client;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for tracking throughput and computing delays when processing stream of tasks.
+ *
+ * <p>The idea is to improve throughput in a typical request-response scenario.
+ * A "frontend" is sending requests towards "backend", backend is sending responses back to fronted.
+ * Both frontend and backend may be realized by multiple Java threads,
+ * so there may be multiple requests not yet responded to.
+ * In terms of taks processing, frontend is "opening" tasks and backend is "closing" them.
+ * Latency of the backend may fluctuate wildly. To avoid backend running out of open tasks,
+ * there should be a queue of requests frontend can add to.
+ * In order to avoid excessive memore consumption, there should be a back-pressure mechanism
+ * which blocks the frontend threads for appropriate durations.
+ * Frontend can tolerate moderately delayed responses, but it only tolerates small block times.
+ *
+ * <p>An ideal back-pressure algorithm would keep the queue reasonably full,
+ * while fairly delaying frontend threads. In other words, backend idle time should be low,
+ * as well as frontend block time dispersion
+ * (as opposed to block time average, which is dictated by overall performance).
+ *
+ * <p>In order for an algorithm to compute reasonable wait times,
+ * various inputs can be useful, mostly related to timing of various stages of task processing.
+ * Methods of this class assume "enqueue and wait" usage.
+ * The delay computation is pessimistic, it expects each participating thread to enqueue another task
+ * as soon as its delay time allows.
+ *
+ * <p>This class is not thread safe, the callers are responsible for guarding against conflicting access.
+ * Time is measured in ticks (nanos), methods never look at current time, relying on {@code now} argument instead.
+ * Input data used for tracking is tightly coupled with TransitQueue#recordCompletion arguments.
+ *
+ * @author Vratko Polak
+ */
+// TODO: Would bulk methods be less taxing than a loop of single task calls?
+@NotThreadSafe
+abstract class ProgressTracker {
+    private static final Logger LOG = LoggerFactory.getLogger(ProgressTracker.class);
+
+    /**
+     * When no tasks has been closed yet, this will be used to estimate throughput.
+     */
+    private final long defaultTicksPerTask;
+
+    /**
+     * Number of tasks closed so far.
+     */
+    private long tasksClosed = 0;
+
+    /**
+     * Number of tasks so far, both open and closed.
+     */
+    private long tasksEncountered = 0;
+
+    /**
+     * The most recent tick number when the number of open tasks has become non-positive.
+     */
+    private long lastIdle = Long.MIN_VALUE;
+
+    /**
+     * The most recent tick number when a task has been closed.
+     */
+    private long lastClosed = Long.MIN_VALUE;
+
+    /**
+     * Tick number when the farthest known wait time is over.
+     */
+    private long nearestAllowed = Long.MIN_VALUE;
+
+    /**
+     * Number of ticks elapsed before lastIdle while there was at least one open task.
+     */
+    private long elapsedBeforeIdle = 0L;
+
+    // Constructors
+
+    /**
+     * Construct an idle tracker with specified ticks per task value to use as default.
+     *
+     * @param ticksPerTask value to use as default
+     */
+    ProgressTracker(final long ticksPerTask) {
+        Preconditions.checkArgument(ticksPerTask >= 0);
+        defaultTicksPerTask = ticksPerTask;
+    }
+
+    /**
+     * Construct a copy of an existing tracker, all future tracking is fully independent.
+     *
+     * @param tracker the instance to copy state from
+     */
+    ProgressTracker(final ProgressTracker tracker) {
+        this.defaultTicksPerTask = tracker.defaultTicksPerTask;
+        this.tasksClosed = tracker.tasksClosed;
+        this.tasksEncountered = tracker.tasksEncountered;
+        this.lastClosed = tracker.lastClosed;
+        this.lastIdle = tracker.lastIdle;
+        this.nearestAllowed = tracker.nearestAllowed;
+        this.elapsedBeforeIdle = tracker.elapsedBeforeIdle;
+    }
+
+    // Public shared access (read-only) accessor-like methods
+
+    /**
+     * Get the value of default ticks per task this instance was created to use.
+     *
+     * @return default ticks per task value
+     */
+    public final long defaultTicksPerTask() {
+        return defaultTicksPerTask;
+    }
+
+    /**
+     * Get number of tasks closed so far.
+     *
+     * @return number of tasks known to be finished already; the value never decreases
+     */
+    public final long tasksClosed() {
+        return tasksClosed;
+    }
+
+    /**
+     * Get umber of tasks so far, both open and closed.
+     *
+     * @return number of tasks encountered so far, open or finished; the value never decreases
+     */
+    public final long tasksEncountered() {
+        return tasksEncountered;
+    }
+
+    /**
+     * Get number of tasks currently open.
+     *
+     * @return number of tasks started but not finished yet
+     */
+    public final long tasksOpen() {
+        // TODO: Should we check the return value is non-negative?
+        return tasksEncountered - tasksClosed;
+    }
+
+    /**
+     * When idle, there are no open tasks so no progress is made.
+     *
+     * @return {@code true} if every encountered task is already closed, {@code false} otherwise
+     */
+    public boolean isIdle() {
+        return tasksClosed >= tasksEncountered;
+    }
+
+    /**
+     * Number of ticks elapsed (before now) since the last closed task while there was at least one open task.
+     *
+     * @param now tick number corresponding to caller's present
+     * @return number of ticks backend is neither idle nor responding
+     */
+    public long ticksStalling(final long now) {
+        return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed;
+    }
+
+    /**
+     * Number of ticks elapsed (before now) while there was at least one open task.
+     *
+     * @param now tick number corresponding to caller's present
+     * @return number of ticks there was at least one task open
+     */
+    public long ticksWorked(final long now) {
+        return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
+    }
+
+    /**
+     * One task is roughly estimated to take this long to close.
+     *
+     * @param now tick number corresponding to caller's present
+     * @return total ticks worked divided by closed tasks, or the default value if no closed tasks
+     */
+    public double ticksWorkedPerClosedTask(final long now) {
+        if (tasksClosed < 1) {
+            return defaultTicksPerTask;
+        }
+        return (double) ticksWorked(now) / tasksClosed;
+    }
+
+    /**
+     * Give an estimate of openTask() return value.
+     *
+     * <p>When the returned delay is positive, the caller thread should wait that time before opening additional task.
+     *
+     * <p>This method in general takes into account previously assigned delays to avoid overlaps.
+     *
+     * @param now tick number corresponding to caller's present
+     * @return delay (in ticks) after which another openTask() is fair to be called by the same thread again
+     */
+    public long estimateDelay(final long now) {
+        return estimateAllowed(now) - now;
+    }
+
+    /**
+     * Give an estimate of a tick number when there will be no accumulated delays.
+     *
+     * <p>The delays accumulated include one more open task.
+     * Basically, the return value corresponds to openTask() return value,
+     * but this gives an absolute time, instead of delay relative to now.
+     *
+     * @param now tick number corresponding to caller's present
+     * @return estimated tick number when all threads with opened tasks are done waiting
+     */
+    public long estimateAllowed(final long now) {
+        return Math.max(now, nearestAllowed) + estimateIsolatedDelay(now);
+    }
+
+    // State-altering public methods.
+
+    /**
+     * Track a task is being closed.
+     *
+     * @param now tick number corresponding to caller's present
+     * @param enqueuedTicks see TransitQueue#recordCompletion
+     * @param transmitTicks see TransitQueue#recordCompletion
+     * @param execNanos see TransitQueue#recordCompletion
+     */
+    public void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
+        if (isIdle()) {
+            LOG.info("Attempted to close a task while no tasks are open");
+        } else {
+            protectedCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
+        }
+    }
+
+    /**
+     * Track a task that is being opened.
+     *
+     * @param now tick number corresponding to caller's present
+     * @return number of ticks (nanos) the caller thread should wait before opening another task
+     */
+    public long openTask(final long now) {
+        protectedOpenTask(now);
+        return reserveDelay(now);
+    }
+
+    // Internal state-altering methods. Protected instead of private,
+    // allowing subclasses to weaken ad-hoc invariants of current implementation.
+
+    /**
+     * Compute the next delay and update nearestAllowed value accordingly.
+     *
+     * @param now tick number corresponding to caller's present
+     * @return number of ticks (nanos) the caller thread should wait before opening another task
+     */
+    protected long reserveDelay(final long now) {
+        nearestAllowed = estimateAllowed(now);
+        return nearestAllowed - now;
+    }
+
+    /**
+     * Track a task is being closed.
+     *
+     * <p>This method does not verify there was any task open.
+     * This call can empty the collection of open tasks, that special case should be handled.
+     *
+     * @param now tick number corresponding to caller's present
+     * @param enqueuedTicks see TransitQueue#recordCompletion
+     * @param transmitTicks see TransitQueue#recordCompletion
+     * @param execNanos see TransitQueue#recordCompletion
+     */
+    protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
+                final long execNanos) {
+        tasksClosed++;
+        lastClosed = now;
+        if (isIdle()) {
+            elapsedBeforeIdle += now - lastIdle;
+        }
+    }
+
+    /**
+     * Track a task is being opened.
+     *
+     * <p>This method does not aggregate delays, allowing the caller to sidestep the throttling.
+     * This call can make the collection of open tasks non-empty, that special case should be handled.
+     *
+     * @param now tick number corresponding to caller's present
+     */
+    protected void protectedOpenTask(final long now) {
+        if (isIdle()) {
+            lastIdle = Math.max(now, lastIdle);
+        }
+        tasksEncountered++;
+    }
+
+    /**
+     * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
+     *
+     * @param now tick number corresponding to caller's present
+     * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
+     */
+    abstract long estimateIsolatedDelay(final long now);
+}
index d97ecd93f30a982f31fb57e7f9757540c17924b1..2def9a10152663b50c4e829436900b89955f42d9 100644 (file)
@@ -15,6 +15,7 @@ final class SimpleReconnectForwarder extends ReconnectForwarder {
 
     @Override
     protected void forwardEntry(final ConnectionEntry entry, final long now) {
+        // We are ignoring requested delay, as we have already paid the admission delay
         successor().enqueueEntry(entry, now);
     }
 }
index e1c5589004ec6685b086efacc32972d890516d8b..4a1b3a2f293a499a64e805250a5a3776222caa67 100644 (file)
@@ -52,6 +52,10 @@ import org.slf4j.LoggerFactory;
 @NotThreadSafe
 abstract class TransmitQueue {
     static final class Halted extends TransmitQueue {
+        Halted(final int targetDepth) {
+            super(targetDepth);
+        }
+
         @Override
         int canTransmitCount(final int inflightSize) {
             return 0;
@@ -67,7 +71,8 @@ abstract class TransmitQueue {
         private final BackendInfo backend;
         private long nextTxSequence;
 
-        Transmitting(final BackendInfo backend) {
+        Transmitting(final int targetDepth, final BackendInfo backend) {
+            super(targetDepth);
             this.backend = Preconditions.checkNotNull(backend);
         }
 
@@ -92,18 +97,22 @@ abstract class TransmitQueue {
 
     private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
     private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
-
+    private final ProgressTracker tracker;
     private ReconnectForwarder successor;
 
+    TransmitQueue(final int targetDepth) {
+        tracker = new AveragingProgressTracker(targetDepth);
+    }
+
     final Iterable<ConnectionEntry> asIterable() {
         return Iterables.concat(inflight, pending);
     }
 
-    private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks,
-            final long execNanos) {
-        // TODO: record
+    final long ticksStalling(final long now) {
+        return tracker.ticksStalling(now);
     }
 
+    // If a matching request was found, this will track a task was closed.
     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
         if (maybeEntry == null) {
@@ -117,7 +126,7 @@ abstract class TransmitQueue {
         }
 
         final TransmittedConnectionEntry entry = maybeEntry.get();
-        recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
+        tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
 
         // We have freed up a slot, try to transmit something
         int toSend = canTransmitCount(inflight.size());
@@ -135,24 +144,35 @@ abstract class TransmitQueue {
         return Optional.of(entry);
     }
 
-    final void enqueue(final ConnectionEntry entry, final long now) {
+    /**
+     * Enqueue an entry, possibly also transmitting it.
+     *
+     * @return Delay to be forced on the calling thread, in nanoseconds.
+     */
+    final long enqueue(final ConnectionEntry entry, final long now) {
         if (successor != null) {
             successor.forwardEntry(entry, now);
-            return;
+            return 0;
         }
 
+        // Reserve an entry before we do anything that can fail
+        final long delay = tracker.openTask(now);
         if (canTransmitCount(inflight.size()) <= 0) {
             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
             pending.add(entry);
-            return;
+        } else {
+            // We are not thread-safe and are supposed to be externally-guarded,
+            // hence send-before-record should be fine.
+            // This needs to be revisited if the external guards are lowered.
+            inflight.offer(transmit(entry, now));
+            LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
         }
-
-        // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine.
-        // This needs to be revisited if the external guards are lowered.
-        inflight.offer(transmit(entry, now));
-        LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
+        return delay;
     }
 
+    /**
+     * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
+     */
     abstract int canTransmitCount(int inflightSize);
 
     abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);