From: Vratko Polak Date: Thu, 22 Dec 2016 09:39:00 +0000 (+0100) Subject: Bug 5280: Add ProgressTracker X-Git-Tag: release/carbon~313 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=refs%2Fchanges%2F53%2F49053%2F18 Bug 5280: Add ProgressTracker ProgressTracker determines the tracking delay required to keep the outbound queue reasonably full. Unlike Guava's RateLimiter, we can assume cooperative callers, hence we can use an enqueue-and-wait strategy as appropriate. This lowers contention in multi-threaded access, as the wait part of the cycle is spent outside the lock. Change-Id: Iaea65c171455b89d7117431599ebc65fe0a4f19a Signed-off-by: Vratko Polak Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 7dc150e403..ac4ac785d6 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -50,9 +50,6 @@ public abstract class AbstractClientConnection { private final TransmitQueue queue; private final Long cookie; - // Updated from actor thread only - private long lastProgress; - private volatile RequestException poisoned; // Do not allow subclassing outside of this package @@ -61,15 +58,13 @@ public abstract class AbstractClientConnection { this.context = Preconditions.checkNotNull(context); this.cookie = Preconditions.checkNotNull(cookie); this.queue = Preconditions.checkNotNull(queue); - this.lastProgress = readTime(); } // Do not allow subclassing outside of this package - AbstractClientConnection(final AbstractClientConnection oldConnection) { + AbstractClientConnection(final AbstractClientConnection oldConnection, final int targetQueueSize) { this.context = oldConnection.context; this.cookie = oldConnection.cookie; - this.lastProgress = oldConnection.lastProgress; - this.queue = new TransmitQueue.Halted(); + this.queue = new TransmitQueue.Halted(targetQueueSize); } public final ClientActorContext context() { @@ -88,6 +83,9 @@ public abstract class AbstractClientConnection { * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke * from any thread. * + *

This method may put the caller thread to sleep in order to throttle the request rate. + * The callback may be called before the sleep finishes. + * * @param request Request to send * @param callback Callback to invoke */ @@ -98,13 +96,7 @@ public abstract class AbstractClientConnection { } final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime()); - - lock.lock(); - try { - queue.enqueue(entry, entry.getEnqueuedTicks()); - } finally { - lock.unlock(); - } + enqueueAndWait(entry, entry.getEnqueuedTicks()); } public abstract Optional getBackendInfo(); @@ -132,15 +124,24 @@ public abstract class AbstractClientConnection { return context.ticker().read(); } - final void enqueueEntry(final ConnectionEntry entry, final long now) { + final long enqueueEntry(final ConnectionEntry entry, final long now) { lock.lock(); try { - queue.enqueue(entry, now); + return queue.enqueue(entry, now); } finally { lock.unlock(); } } + final void enqueueAndWait(final ConnectionEntry entry, final long now) { + final long delay = enqueueEntry(entry, now); + try { + TimeUnit.NANOSECONDS.sleep(delay); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping"); + } + } + /** * Schedule a timer to fire on the actor thread after a delay. * @@ -165,16 +166,15 @@ public abstract class AbstractClientConnection { lock.lock(); try { final long now = readTime(); - if (!queue.isEmpty()) { - final long ticksSinceProgress = now - lastProgress; - if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { - LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, - TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress)); - - lockedPoison(new NoProgressException(ticksSinceProgress)); - current.removeConnection(this); - return current; - } + // The following line is only reliable when queue is not forwarding, but such state should not last long. + final long ticksSinceProgress = queue.ticksStalling(now); + if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { + LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, + TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress)); + + lockedPoison(new NoProgressException(ticksSinceProgress)); + current.removeConnection(this); + return current; } // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward. @@ -222,13 +222,13 @@ public abstract class AbstractClientConnection { return Optional.empty(); } - final long delay = head.getEnqueuedTicks() - now + REQUEST_TIMEOUT_NANOS; - if (delay <= 0) { - LOG.debug("Connection {} timed out", this); + final long beenOpen = now - head.getEnqueuedTicks(); + if (beenOpen >= REQUEST_TIMEOUT_NANOS) { + LOG.debug("Connection {} has a request not completed for {} nanoseconds, timing out", this, beenOpen); return null; } - return Optional.of(FiniteDuration.apply(delay, TimeUnit.NANOSECONDS)); + return Optional.of(FiniteDuration.apply(REQUEST_TIMEOUT_NANOS - beenOpen, TimeUnit.NANOSECONDS)); } final void poison(final RequestException cause) { @@ -267,7 +267,5 @@ public abstract class AbstractClientConnection { LOG.debug("Completing {} with {}", entry, envelope); entry.complete(envelope.getMessage()); } - - lastProgress = readTime(); } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java index 15da294ec9..8d9ed24043 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java @@ -12,25 +12,42 @@ import java.util.Optional; /** * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its - * sublcasses. It allows us to share some code. + * subclasses. It allows us to share some code. * * @author Robert Varga * * @param Concrete {@link BackendInfo} type */ abstract class AbstractReceivingClientConnection extends AbstractClientConnection { + /** + * Multiplication factor applied to remote's advertised limit on outstanding messages. Our default strategy + * rate-limiting strategy in {@link AveragingProgressTracker} does not penalize threads as long as we have not + * reached half of the target. + * + *

+ * By multiplying the advertised maximum by four, our queue steady-state should end up with: + * - the backend pipeline being full, + * - another full batch of messages being in the queue while not paying any throttling cost + * - another 2 full batches of messages with incremental throttling cost + */ + private static final int MESSAGE_QUEUE_FACTOR = 4; + private final T backend; AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) { - super(context, cookie, new TransmitQueue.Transmitting(backend)); + super(context, cookie, new TransmitQueue.Transmitting(targetQueueSize(backend), backend)); this.backend = Preconditions.checkNotNull(backend); } AbstractReceivingClientConnection(final AbstractReceivingClientConnection oldConnection) { - super(oldConnection); + super(oldConnection, targetQueueSize(oldConnection.backend)); this.backend = oldConnection.backend; } + private static int targetQueueSize(final BackendInfo backend) { + return backend.getMaxMessages() * MESSAGE_QUEUE_FACTOR; + } + @Override public final Optional getBackendInfo() { return Optional.of(backend); diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java new file mode 100644 index 0000000000..1c6210a920 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AveragingProgressTracker.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.access.client; + +import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.NotThreadSafe; + +/** + * A ProgressTracker subclass which uses {@code ticksWorkedPerClosedTask} to compute delays. + * + *

This class has {@code tasksOpenLimit} used as a (weak) limit, + * as number of open tasks approaches that value, delays computed are increasing. + * + *

In order to keep delays from raising to unreasonably high values, + * a maximal delay (per task) value is never exceeded. + * + *

On the other hand, there is no delay when number of open tasks is half the limit or less, + * in order to prevent backend from running out of tasks while there may be waiting frontend threads. + * + * @author Vratko Polak + */ +@NotThreadSafe +final class AveragingProgressTracker extends ProgressTracker { + private static final long DEFAULT_TICKS_PER_TASK = TimeUnit.MILLISECONDS.toNanos(500); + + /** + * The implementation will avoid having more that this number of tasks open. + */ + private final long tasksOpenLimit; + + /** + * We do not delay tasks until their count hits this threshold. + */ + private final long noDelayThreshold; + + /** + * Create an idle tracker with limit and specified ticks per task value to use as default. + * + * @param limit of open tasks to avoid exceeding + * @param ticksPerTask value to use as default + */ + private AveragingProgressTracker(final int limit, final long ticksPerTask) { + super(ticksPerTask); + tasksOpenLimit = limit; + noDelayThreshold = limit / 2; + } + + /** + * Create a default idle tracker with given limit. + * + * @param limit of open tasks to avoid exceeding + */ + AveragingProgressTracker(final int limit) { + this(limit, DEFAULT_TICKS_PER_TASK); + } + + /** + * Create a copy of an existing tracker, all future tracking is fully independent. + * + * @param tracker the instance to copy state from + */ + AveragingProgressTracker(final AveragingProgressTracker tracker) { + super(tracker); + this.tasksOpenLimit = tracker.tasksOpenLimit; + this.noDelayThreshold = tracker.noDelayThreshold; + } + + // Public shared access (read-only) accessor-like methods + + /** + * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored. + * + *

This implementation returns zero delay if number of open tasks is half of limit or less. + * Else the delay is computed, aiming to keep number of open tasks at 3/4 of limit, + * assuming backend throughput remains constant. + * + *

As the number of open tasks approaches the limit, + * the computed delay increases, but it never exceeds defaultTicksPerTask. + * That means the actual number of open tasks can exceed the limit. + * + * @param now tick number corresponding to caller's present + * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again + */ + @Override + public long estimateIsolatedDelay(final long now) { + final long open = tasksOpen(); + if (open <= noDelayThreshold) { + return 0L; + } + if (open >= tasksOpenLimit) { + return defaultTicksPerTask(); + } + + /* + * Calculate the task capacity relative to the limit on open tasks. In real terms this value can be + * in the open interval (0.0, 0.5). + */ + final double relativeRemainingCapacity = 1.0 - (((double) open) / tasksOpenLimit); + + /* + * Calculate delay coefficient. It increases in inverse proportion to relative remaining capacity, approaching + * infinity as remaining capacity approaches 0.0. + */ + final double delayCoefficient = (0.5 - relativeRemainingCapacity) / relativeRemainingCapacity; + final long delay = (long) (ticksWorkedPerClosedTask(now) * delayCoefficient); + + /* + * Cap the result to defaultTicksPerTask, since the calculated delay may overstep it. + */ + return Math.min(delay, defaultTicksPerTask()); + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java index 64867e1c0e..aa986873d5 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java @@ -12,9 +12,16 @@ import java.util.Optional; @Beta public final class ConnectingClientConnection extends AbstractClientConnection { + /** + * A wild estimate on how deep a queue should be. Without having knowledge of the remote actor we can only + * guess its processing capabilities while we are doing initial buffering. With {@link AveragingProgressTracker} + * this boils down to a burst of up to 2000 messages before we start throttling. + */ + private static final int TARGET_QUEUE_DEPTH = 4000; + // Initial state, never instantiated externally ConnectingClientConnection(final ClientActorContext context, final Long cookie) { - super(context, cookie, new TransmitQueue.Halted()); + super(context, cookie, new TransmitQueue.Halted(TARGET_QUEUE_DEPTH)); } @Override diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java new file mode 100644 index 0000000000..024141a040 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java @@ -0,0 +1,308 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.access.client; + +import com.google.common.base.Preconditions; +import javax.annotation.concurrent.NotThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for tracking throughput and computing delays when processing stream of tasks. + * + *

The idea is to improve throughput in a typical request-response scenario. + * A "frontend" is sending requests towards "backend", backend is sending responses back to fronted. + * Both frontend and backend may be realized by multiple Java threads, + * so there may be multiple requests not yet responded to. + * In terms of taks processing, frontend is "opening" tasks and backend is "closing" them. + * Latency of the backend may fluctuate wildly. To avoid backend running out of open tasks, + * there should be a queue of requests frontend can add to. + * In order to avoid excessive memore consumption, there should be a back-pressure mechanism + * which blocks the frontend threads for appropriate durations. + * Frontend can tolerate moderately delayed responses, but it only tolerates small block times. + * + *

An ideal back-pressure algorithm would keep the queue reasonably full, + * while fairly delaying frontend threads. In other words, backend idle time should be low, + * as well as frontend block time dispersion + * (as opposed to block time average, which is dictated by overall performance). + * + *

In order for an algorithm to compute reasonable wait times, + * various inputs can be useful, mostly related to timing of various stages of task processing. + * Methods of this class assume "enqueue and wait" usage. + * The delay computation is pessimistic, it expects each participating thread to enqueue another task + * as soon as its delay time allows. + * + *

This class is not thread safe, the callers are responsible for guarding against conflicting access. + * Time is measured in ticks (nanos), methods never look at current time, relying on {@code now} argument instead. + * Input data used for tracking is tightly coupled with TransitQueue#recordCompletion arguments. + * + * @author Vratko Polak + */ +// TODO: Would bulk methods be less taxing than a loop of single task calls? +@NotThreadSafe +abstract class ProgressTracker { + private static final Logger LOG = LoggerFactory.getLogger(ProgressTracker.class); + + /** + * When no tasks has been closed yet, this will be used to estimate throughput. + */ + private final long defaultTicksPerTask; + + /** + * Number of tasks closed so far. + */ + private long tasksClosed = 0; + + /** + * Number of tasks so far, both open and closed. + */ + private long tasksEncountered = 0; + + /** + * The most recent tick number when the number of open tasks has become non-positive. + */ + private long lastIdle = Long.MIN_VALUE; + + /** + * The most recent tick number when a task has been closed. + */ + private long lastClosed = Long.MIN_VALUE; + + /** + * Tick number when the farthest known wait time is over. + */ + private long nearestAllowed = Long.MIN_VALUE; + + /** + * Number of ticks elapsed before lastIdle while there was at least one open task. + */ + private long elapsedBeforeIdle = 0L; + + // Constructors + + /** + * Construct an idle tracker with specified ticks per task value to use as default. + * + * @param ticksPerTask value to use as default + */ + ProgressTracker(final long ticksPerTask) { + Preconditions.checkArgument(ticksPerTask >= 0); + defaultTicksPerTask = ticksPerTask; + } + + /** + * Construct a copy of an existing tracker, all future tracking is fully independent. + * + * @param tracker the instance to copy state from + */ + ProgressTracker(final ProgressTracker tracker) { + this.defaultTicksPerTask = tracker.defaultTicksPerTask; + this.tasksClosed = tracker.tasksClosed; + this.tasksEncountered = tracker.tasksEncountered; + this.lastClosed = tracker.lastClosed; + this.lastIdle = tracker.lastIdle; + this.nearestAllowed = tracker.nearestAllowed; + this.elapsedBeforeIdle = tracker.elapsedBeforeIdle; + } + + // Public shared access (read-only) accessor-like methods + + /** + * Get the value of default ticks per task this instance was created to use. + * + * @return default ticks per task value + */ + public final long defaultTicksPerTask() { + return defaultTicksPerTask; + } + + /** + * Get number of tasks closed so far. + * + * @return number of tasks known to be finished already; the value never decreases + */ + public final long tasksClosed() { + return tasksClosed; + } + + /** + * Get umber of tasks so far, both open and closed. + * + * @return number of tasks encountered so far, open or finished; the value never decreases + */ + public final long tasksEncountered() { + return tasksEncountered; + } + + /** + * Get number of tasks currently open. + * + * @return number of tasks started but not finished yet + */ + public final long tasksOpen() { + // TODO: Should we check the return value is non-negative? + return tasksEncountered - tasksClosed; + } + + /** + * When idle, there are no open tasks so no progress is made. + * + * @return {@code true} if every encountered task is already closed, {@code false} otherwise + */ + public boolean isIdle() { + return tasksClosed >= tasksEncountered; + } + + /** + * Number of ticks elapsed (before now) since the last closed task while there was at least one open task. + * + * @param now tick number corresponding to caller's present + * @return number of ticks backend is neither idle nor responding + */ + public long ticksStalling(final long now) { + return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed; + } + + /** + * Number of ticks elapsed (before now) while there was at least one open task. + * + * @param now tick number corresponding to caller's present + * @return number of ticks there was at least one task open + */ + public long ticksWorked(final long now) { + return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle; + } + + /** + * One task is roughly estimated to take this long to close. + * + * @param now tick number corresponding to caller's present + * @return total ticks worked divided by closed tasks, or the default value if no closed tasks + */ + public double ticksWorkedPerClosedTask(final long now) { + if (tasksClosed < 1) { + return defaultTicksPerTask; + } + return (double) ticksWorked(now) / tasksClosed; + } + + /** + * Give an estimate of openTask() return value. + * + *

When the returned delay is positive, the caller thread should wait that time before opening additional task. + * + *

This method in general takes into account previously assigned delays to avoid overlaps. + * + * @param now tick number corresponding to caller's present + * @return delay (in ticks) after which another openTask() is fair to be called by the same thread again + */ + public long estimateDelay(final long now) { + return estimateAllowed(now) - now; + } + + /** + * Give an estimate of a tick number when there will be no accumulated delays. + * + *

The delays accumulated include one more open task. + * Basically, the return value corresponds to openTask() return value, + * but this gives an absolute time, instead of delay relative to now. + * + * @param now tick number corresponding to caller's present + * @return estimated tick number when all threads with opened tasks are done waiting + */ + public long estimateAllowed(final long now) { + return Math.max(now, nearestAllowed) + estimateIsolatedDelay(now); + } + + // State-altering public methods. + + /** + * Track a task is being closed. + * + * @param now tick number corresponding to caller's present + * @param enqueuedTicks see TransitQueue#recordCompletion + * @param transmitTicks see TransitQueue#recordCompletion + * @param execNanos see TransitQueue#recordCompletion + */ + public void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) { + if (isIdle()) { + LOG.info("Attempted to close a task while no tasks are open"); + } else { + protectedCloseTask(now, enqueuedTicks, transmitTicks, execNanos); + } + } + + /** + * Track a task that is being opened. + * + * @param now tick number corresponding to caller's present + * @return number of ticks (nanos) the caller thread should wait before opening another task + */ + public long openTask(final long now) { + protectedOpenTask(now); + return reserveDelay(now); + } + + // Internal state-altering methods. Protected instead of private, + // allowing subclasses to weaken ad-hoc invariants of current implementation. + + /** + * Compute the next delay and update nearestAllowed value accordingly. + * + * @param now tick number corresponding to caller's present + * @return number of ticks (nanos) the caller thread should wait before opening another task + */ + protected long reserveDelay(final long now) { + nearestAllowed = estimateAllowed(now); + return nearestAllowed - now; + } + + /** + * Track a task is being closed. + * + *

This method does not verify there was any task open. + * This call can empty the collection of open tasks, that special case should be handled. + * + * @param now tick number corresponding to caller's present + * @param enqueuedTicks see TransitQueue#recordCompletion + * @param transmitTicks see TransitQueue#recordCompletion + * @param execNanos see TransitQueue#recordCompletion + */ + protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks, + final long execNanos) { + tasksClosed++; + lastClosed = now; + if (isIdle()) { + elapsedBeforeIdle += now - lastIdle; + } + } + + /** + * Track a task is being opened. + * + *

This method does not aggregate delays, allowing the caller to sidestep the throttling. + * This call can make the collection of open tasks non-empty, that special case should be handled. + * + * @param now tick number corresponding to caller's present + */ + protected void protectedOpenTask(final long now) { + if (isIdle()) { + lastIdle = Math.max(now, lastIdle); + } + tasksEncountered++; + } + + /** + * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored. + * + * @param now tick number corresponding to caller's present + * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again + */ + abstract long estimateIsolatedDelay(final long now); +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java index d97ecd93f3..2def9a1015 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java @@ -15,6 +15,7 @@ final class SimpleReconnectForwarder extends ReconnectForwarder { @Override protected void forwardEntry(final ConnectionEntry entry, final long now) { + // We are ignoring requested delay, as we have already paid the admission delay successor().enqueueEntry(entry, now); } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index e1c5589004..4a1b3a2f29 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -52,6 +52,10 @@ import org.slf4j.LoggerFactory; @NotThreadSafe abstract class TransmitQueue { static final class Halted extends TransmitQueue { + Halted(final int targetDepth) { + super(targetDepth); + } + @Override int canTransmitCount(final int inflightSize) { return 0; @@ -67,7 +71,8 @@ abstract class TransmitQueue { private final BackendInfo backend; private long nextTxSequence; - Transmitting(final BackendInfo backend) { + Transmitting(final int targetDepth, final BackendInfo backend) { + super(targetDepth); this.backend = Preconditions.checkNotNull(backend); } @@ -92,18 +97,22 @@ abstract class TransmitQueue { private final ArrayDeque inflight = new ArrayDeque<>(); private final ArrayDeque pending = new ArrayDeque<>(); - + private final ProgressTracker tracker; private ReconnectForwarder successor; + TransmitQueue(final int targetDepth) { + tracker = new AveragingProgressTracker(targetDepth); + } + final Iterable asIterable() { return Iterables.concat(inflight, pending); } - private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks, - final long execNanos) { - // TODO: record + final long ticksStalling(final long now) { + return tracker.ticksStalling(now); } + // If a matching request was found, this will track a task was closed. final Optional complete(final ResponseEnvelope envelope, final long now) { Optional maybeEntry = findMatchingEntry(inflight, envelope); if (maybeEntry == null) { @@ -117,7 +126,7 @@ abstract class TransmitQueue { } final TransmittedConnectionEntry entry = maybeEntry.get(); - recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); + tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); // We have freed up a slot, try to transmit something int toSend = canTransmitCount(inflight.size()); @@ -135,24 +144,35 @@ abstract class TransmitQueue { return Optional.of(entry); } - final void enqueue(final ConnectionEntry entry, final long now) { + /** + * Enqueue an entry, possibly also transmitting it. + * + * @return Delay to be forced on the calling thread, in nanoseconds. + */ + final long enqueue(final ConnectionEntry entry, final long now) { if (successor != null) { successor.forwardEntry(entry, now); - return; + return 0; } + // Reserve an entry before we do anything that can fail + final long delay = tracker.openTask(now); if (canTransmitCount(inflight.size()) <= 0) { LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest()); pending.add(entry); - return; + } else { + // We are not thread-safe and are supposed to be externally-guarded, + // hence send-before-record should be fine. + // This needs to be revisited if the external guards are lowered. + inflight.offer(transmit(entry, now)); + LOG.debug("Sent request {} on queue {}", entry.getRequest(), this); } - - // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine. - // This needs to be revisited if the external guards are lowered. - inflight.offer(transmit(entry, now)); - LOG.debug("Sent request {} on queue {}", entry.getRequest(), this); + return delay; } + /** + * Return the number of entries which can be transmitted assuming the supplied in-flight queue size. + */ abstract int canTransmitCount(int inflightSize); abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);