private final TransmitQueue queue;
private final Long cookie;
- // Updated from actor thread only
- private long lastProgress;
-
private volatile RequestException poisoned;
// Do not allow subclassing outside of this package
this.context = Preconditions.checkNotNull(context);
this.cookie = Preconditions.checkNotNull(cookie);
this.queue = Preconditions.checkNotNull(queue);
- this.lastProgress = readTime();
}
// Do not allow subclassing outside of this package
- AbstractClientConnection(final AbstractClientConnection<T> oldConnection) {
+ AbstractClientConnection(final AbstractClientConnection<T> oldConnection, final int targetQueueSize) {
this.context = oldConnection.context;
this.cookie = oldConnection.cookie;
- this.lastProgress = oldConnection.lastProgress;
- this.queue = new TransmitQueue.Halted();
+ this.queue = new TransmitQueue.Halted(targetQueueSize);
}
public final ClientActorContext context() {
* Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
* from any thread.
*
+ * <p>This method may put the caller thread to sleep in order to throttle the request rate.
+ * The callback may be called before the sleep finishes.
+ *
* @param request Request to send
* @param callback Callback to invoke
*/
}
final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
-
- lock.lock();
- try {
- queue.enqueue(entry, entry.getEnqueuedTicks());
- } finally {
- lock.unlock();
- }
+ enqueueAndWait(entry, entry.getEnqueuedTicks());
}
public abstract Optional<T> getBackendInfo();
return context.ticker().read();
}
- final void enqueueEntry(final ConnectionEntry entry, final long now) {
+ final long enqueueEntry(final ConnectionEntry entry, final long now) {
lock.lock();
try {
- queue.enqueue(entry, now);
+ return queue.enqueue(entry, now);
} finally {
lock.unlock();
}
}
+ final void enqueueAndWait(final ConnectionEntry entry, final long now) {
+ final long delay = enqueueEntry(entry, now);
+ try {
+ TimeUnit.NANOSECONDS.sleep(delay);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping");
+ }
+ }
+
/**
* Schedule a timer to fire on the actor thread after a delay.
*
lock.lock();
try {
final long now = readTime();
- if (!queue.isEmpty()) {
- final long ticksSinceProgress = now - lastProgress;
- if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
- LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
- TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
-
- lockedPoison(new NoProgressException(ticksSinceProgress));
- current.removeConnection(this);
- return current;
- }
+ // The following line is only reliable when queue is not forwarding, but such state should not last long.
+ final long ticksSinceProgress = queue.ticksStalling(now);
+ if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
+ LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+ TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+
+ lockedPoison(new NoProgressException(ticksSinceProgress));
+ current.removeConnection(this);
+ return current;
}
// Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
return Optional.empty();
}
- final long delay = head.getEnqueuedTicks() - now + REQUEST_TIMEOUT_NANOS;
- if (delay <= 0) {
- LOG.debug("Connection {} timed out", this);
+ final long beenOpen = now - head.getEnqueuedTicks();
+ if (beenOpen >= REQUEST_TIMEOUT_NANOS) {
+ LOG.debug("Connection {} has a request not completed for {} nanoseconds, timing out", this, beenOpen);
return null;
}
- return Optional.of(FiniteDuration.apply(delay, TimeUnit.NANOSECONDS));
+ return Optional.of(FiniteDuration.apply(REQUEST_TIMEOUT_NANOS - beenOpen, TimeUnit.NANOSECONDS));
}
final void poison(final RequestException cause) {
LOG.debug("Completing {} with {}", entry, envelope);
entry.complete(envelope.getMessage());
}
-
- lastProgress = readTime();
}
}
/**
* Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
- * sublcasses. It allows us to share some code.
+ * subclasses. It allows us to share some code.
*
* @author Robert Varga
*
* @param <T> Concrete {@link BackendInfo} type
*/
abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
+ /**
+ * Multiplication factor applied to remote's advertised limit on outstanding messages. Our default strategy
+ * rate-limiting strategy in {@link AveragingProgressTracker} does not penalize threads as long as we have not
+ * reached half of the target.
+ *
+ * <p>
+ * By multiplying the advertised maximum by four, our queue steady-state should end up with:
+ * - the backend pipeline being full,
+ * - another full batch of messages being in the queue while not paying any throttling cost
+ * - another 2 full batches of messages with incremental throttling cost
+ */
+ private static final int MESSAGE_QUEUE_FACTOR = 4;
+
private final T backend;
AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
- super(context, cookie, new TransmitQueue.Transmitting(backend));
+ super(context, cookie, new TransmitQueue.Transmitting(targetQueueSize(backend), backend));
this.backend = Preconditions.checkNotNull(backend);
}
AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
- super(oldConnection);
+ super(oldConnection, targetQueueSize(oldConnection.backend));
this.backend = oldConnection.backend;
}
+ private static int targetQueueSize(final BackendInfo backend) {
+ return backend.getMaxMessages() * MESSAGE_QUEUE_FACTOR;
+ }
+
@Override
public final Optional<T> getBackendInfo() {
return Optional.of(backend);
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.access.client;
+
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * A ProgressTracker subclass which uses {@code ticksWorkedPerClosedTask} to compute delays.
+ *
+ * <p>This class has {@code tasksOpenLimit} used as a (weak) limit,
+ * as number of open tasks approaches that value, delays computed are increasing.
+ *
+ * <p>In order to keep delays from raising to unreasonably high values,
+ * a maximal delay (per task) value is never exceeded.
+ *
+ * <p>On the other hand, there is no delay when number of open tasks is half the limit or less,
+ * in order to prevent backend from running out of tasks while there may be waiting frontend threads.
+ *
+ * @author Vratko Polak
+ */
+@NotThreadSafe
+final class AveragingProgressTracker extends ProgressTracker {
+ private static final long DEFAULT_TICKS_PER_TASK = TimeUnit.MILLISECONDS.toNanos(500);
+
+ /**
+ * The implementation will avoid having more that this number of tasks open.
+ */
+ private final long tasksOpenLimit;
+
+ /**
+ * We do not delay tasks until their count hits this threshold.
+ */
+ private final long noDelayThreshold;
+
+ /**
+ * Create an idle tracker with limit and specified ticks per task value to use as default.
+ *
+ * @param limit of open tasks to avoid exceeding
+ * @param ticksPerTask value to use as default
+ */
+ private AveragingProgressTracker(final int limit, final long ticksPerTask) {
+ super(ticksPerTask);
+ tasksOpenLimit = limit;
+ noDelayThreshold = limit / 2;
+ }
+
+ /**
+ * Create a default idle tracker with given limit.
+ *
+ * @param limit of open tasks to avoid exceeding
+ */
+ AveragingProgressTracker(final int limit) {
+ this(limit, DEFAULT_TICKS_PER_TASK);
+ }
+
+ /**
+ * Create a copy of an existing tracker, all future tracking is fully independent.
+ *
+ * @param tracker the instance to copy state from
+ */
+ AveragingProgressTracker(final AveragingProgressTracker tracker) {
+ super(tracker);
+ this.tasksOpenLimit = tracker.tasksOpenLimit;
+ this.noDelayThreshold = tracker.noDelayThreshold;
+ }
+
+ // Public shared access (read-only) accessor-like methods
+
+ /**
+ * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
+ *
+ * <p>This implementation returns zero delay if number of open tasks is half of limit or less.
+ * Else the delay is computed, aiming to keep number of open tasks at 3/4 of limit,
+ * assuming backend throughput remains constant.
+ *
+ * <p>As the number of open tasks approaches the limit,
+ * the computed delay increases, but it never exceeds defaultTicksPerTask.
+ * That means the actual number of open tasks can exceed the limit.
+ *
+ * @param now tick number corresponding to caller's present
+ * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
+ */
+ @Override
+ public long estimateIsolatedDelay(final long now) {
+ final long open = tasksOpen();
+ if (open <= noDelayThreshold) {
+ return 0L;
+ }
+ if (open >= tasksOpenLimit) {
+ return defaultTicksPerTask();
+ }
+
+ /*
+ * Calculate the task capacity relative to the limit on open tasks. In real terms this value can be
+ * in the open interval (0.0, 0.5).
+ */
+ final double relativeRemainingCapacity = 1.0 - (((double) open) / tasksOpenLimit);
+
+ /*
+ * Calculate delay coefficient. It increases in inverse proportion to relative remaining capacity, approaching
+ * infinity as remaining capacity approaches 0.0.
+ */
+ final double delayCoefficient = (0.5 - relativeRemainingCapacity) / relativeRemainingCapacity;
+ final long delay = (long) (ticksWorkedPerClosedTask(now) * delayCoefficient);
+
+ /*
+ * Cap the result to defaultTicksPerTask, since the calculated delay may overstep it.
+ */
+ return Math.min(delay, defaultTicksPerTask());
+ }
+}
@Beta
public final class ConnectingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
+ /**
+ * A wild estimate on how deep a queue should be. Without having knowledge of the remote actor we can only
+ * guess its processing capabilities while we are doing initial buffering. With {@link AveragingProgressTracker}
+ * this boils down to a burst of up to 2000 messages before we start throttling.
+ */
+ private static final int TARGET_QUEUE_DEPTH = 4000;
+
// Initial state, never instantiated externally
ConnectingClientConnection(final ClientActorContext context, final Long cookie) {
- super(context, cookie, new TransmitQueue.Halted());
+ super(context, cookie, new TransmitQueue.Halted(TARGET_QUEUE_DEPTH));
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.access.client;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for tracking throughput and computing delays when processing stream of tasks.
+ *
+ * <p>The idea is to improve throughput in a typical request-response scenario.
+ * A "frontend" is sending requests towards "backend", backend is sending responses back to fronted.
+ * Both frontend and backend may be realized by multiple Java threads,
+ * so there may be multiple requests not yet responded to.
+ * In terms of taks processing, frontend is "opening" tasks and backend is "closing" them.
+ * Latency of the backend may fluctuate wildly. To avoid backend running out of open tasks,
+ * there should be a queue of requests frontend can add to.
+ * In order to avoid excessive memore consumption, there should be a back-pressure mechanism
+ * which blocks the frontend threads for appropriate durations.
+ * Frontend can tolerate moderately delayed responses, but it only tolerates small block times.
+ *
+ * <p>An ideal back-pressure algorithm would keep the queue reasonably full,
+ * while fairly delaying frontend threads. In other words, backend idle time should be low,
+ * as well as frontend block time dispersion
+ * (as opposed to block time average, which is dictated by overall performance).
+ *
+ * <p>In order for an algorithm to compute reasonable wait times,
+ * various inputs can be useful, mostly related to timing of various stages of task processing.
+ * Methods of this class assume "enqueue and wait" usage.
+ * The delay computation is pessimistic, it expects each participating thread to enqueue another task
+ * as soon as its delay time allows.
+ *
+ * <p>This class is not thread safe, the callers are responsible for guarding against conflicting access.
+ * Time is measured in ticks (nanos), methods never look at current time, relying on {@code now} argument instead.
+ * Input data used for tracking is tightly coupled with TransitQueue#recordCompletion arguments.
+ *
+ * @author Vratko Polak
+ */
+// TODO: Would bulk methods be less taxing than a loop of single task calls?
+@NotThreadSafe
+abstract class ProgressTracker {
+ private static final Logger LOG = LoggerFactory.getLogger(ProgressTracker.class);
+
+ /**
+ * When no tasks has been closed yet, this will be used to estimate throughput.
+ */
+ private final long defaultTicksPerTask;
+
+ /**
+ * Number of tasks closed so far.
+ */
+ private long tasksClosed = 0;
+
+ /**
+ * Number of tasks so far, both open and closed.
+ */
+ private long tasksEncountered = 0;
+
+ /**
+ * The most recent tick number when the number of open tasks has become non-positive.
+ */
+ private long lastIdle = Long.MIN_VALUE;
+
+ /**
+ * The most recent tick number when a task has been closed.
+ */
+ private long lastClosed = Long.MIN_VALUE;
+
+ /**
+ * Tick number when the farthest known wait time is over.
+ */
+ private long nearestAllowed = Long.MIN_VALUE;
+
+ /**
+ * Number of ticks elapsed before lastIdle while there was at least one open task.
+ */
+ private long elapsedBeforeIdle = 0L;
+
+ // Constructors
+
+ /**
+ * Construct an idle tracker with specified ticks per task value to use as default.
+ *
+ * @param ticksPerTask value to use as default
+ */
+ ProgressTracker(final long ticksPerTask) {
+ Preconditions.checkArgument(ticksPerTask >= 0);
+ defaultTicksPerTask = ticksPerTask;
+ }
+
+ /**
+ * Construct a copy of an existing tracker, all future tracking is fully independent.
+ *
+ * @param tracker the instance to copy state from
+ */
+ ProgressTracker(final ProgressTracker tracker) {
+ this.defaultTicksPerTask = tracker.defaultTicksPerTask;
+ this.tasksClosed = tracker.tasksClosed;
+ this.tasksEncountered = tracker.tasksEncountered;
+ this.lastClosed = tracker.lastClosed;
+ this.lastIdle = tracker.lastIdle;
+ this.nearestAllowed = tracker.nearestAllowed;
+ this.elapsedBeforeIdle = tracker.elapsedBeforeIdle;
+ }
+
+ // Public shared access (read-only) accessor-like methods
+
+ /**
+ * Get the value of default ticks per task this instance was created to use.
+ *
+ * @return default ticks per task value
+ */
+ public final long defaultTicksPerTask() {
+ return defaultTicksPerTask;
+ }
+
+ /**
+ * Get number of tasks closed so far.
+ *
+ * @return number of tasks known to be finished already; the value never decreases
+ */
+ public final long tasksClosed() {
+ return tasksClosed;
+ }
+
+ /**
+ * Get umber of tasks so far, both open and closed.
+ *
+ * @return number of tasks encountered so far, open or finished; the value never decreases
+ */
+ public final long tasksEncountered() {
+ return tasksEncountered;
+ }
+
+ /**
+ * Get number of tasks currently open.
+ *
+ * @return number of tasks started but not finished yet
+ */
+ public final long tasksOpen() {
+ // TODO: Should we check the return value is non-negative?
+ return tasksEncountered - tasksClosed;
+ }
+
+ /**
+ * When idle, there are no open tasks so no progress is made.
+ *
+ * @return {@code true} if every encountered task is already closed, {@code false} otherwise
+ */
+ public boolean isIdle() {
+ return tasksClosed >= tasksEncountered;
+ }
+
+ /**
+ * Number of ticks elapsed (before now) since the last closed task while there was at least one open task.
+ *
+ * @param now tick number corresponding to caller's present
+ * @return number of ticks backend is neither idle nor responding
+ */
+ public long ticksStalling(final long now) {
+ return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed;
+ }
+
+ /**
+ * Number of ticks elapsed (before now) while there was at least one open task.
+ *
+ * @param now tick number corresponding to caller's present
+ * @return number of ticks there was at least one task open
+ */
+ public long ticksWorked(final long now) {
+ return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
+ }
+
+ /**
+ * One task is roughly estimated to take this long to close.
+ *
+ * @param now tick number corresponding to caller's present
+ * @return total ticks worked divided by closed tasks, or the default value if no closed tasks
+ */
+ public double ticksWorkedPerClosedTask(final long now) {
+ if (tasksClosed < 1) {
+ return defaultTicksPerTask;
+ }
+ return (double) ticksWorked(now) / tasksClosed;
+ }
+
+ /**
+ * Give an estimate of openTask() return value.
+ *
+ * <p>When the returned delay is positive, the caller thread should wait that time before opening additional task.
+ *
+ * <p>This method in general takes into account previously assigned delays to avoid overlaps.
+ *
+ * @param now tick number corresponding to caller's present
+ * @return delay (in ticks) after which another openTask() is fair to be called by the same thread again
+ */
+ public long estimateDelay(final long now) {
+ return estimateAllowed(now) - now;
+ }
+
+ /**
+ * Give an estimate of a tick number when there will be no accumulated delays.
+ *
+ * <p>The delays accumulated include one more open task.
+ * Basically, the return value corresponds to openTask() return value,
+ * but this gives an absolute time, instead of delay relative to now.
+ *
+ * @param now tick number corresponding to caller's present
+ * @return estimated tick number when all threads with opened tasks are done waiting
+ */
+ public long estimateAllowed(final long now) {
+ return Math.max(now, nearestAllowed) + estimateIsolatedDelay(now);
+ }
+
+ // State-altering public methods.
+
+ /**
+ * Track a task is being closed.
+ *
+ * @param now tick number corresponding to caller's present
+ * @param enqueuedTicks see TransitQueue#recordCompletion
+ * @param transmitTicks see TransitQueue#recordCompletion
+ * @param execNanos see TransitQueue#recordCompletion
+ */
+ public void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
+ if (isIdle()) {
+ LOG.info("Attempted to close a task while no tasks are open");
+ } else {
+ protectedCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
+ }
+ }
+
+ /**
+ * Track a task that is being opened.
+ *
+ * @param now tick number corresponding to caller's present
+ * @return number of ticks (nanos) the caller thread should wait before opening another task
+ */
+ public long openTask(final long now) {
+ protectedOpenTask(now);
+ return reserveDelay(now);
+ }
+
+ // Internal state-altering methods. Protected instead of private,
+ // allowing subclasses to weaken ad-hoc invariants of current implementation.
+
+ /**
+ * Compute the next delay and update nearestAllowed value accordingly.
+ *
+ * @param now tick number corresponding to caller's present
+ * @return number of ticks (nanos) the caller thread should wait before opening another task
+ */
+ protected long reserveDelay(final long now) {
+ nearestAllowed = estimateAllowed(now);
+ return nearestAllowed - now;
+ }
+
+ /**
+ * Track a task is being closed.
+ *
+ * <p>This method does not verify there was any task open.
+ * This call can empty the collection of open tasks, that special case should be handled.
+ *
+ * @param now tick number corresponding to caller's present
+ * @param enqueuedTicks see TransitQueue#recordCompletion
+ * @param transmitTicks see TransitQueue#recordCompletion
+ * @param execNanos see TransitQueue#recordCompletion
+ */
+ protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
+ final long execNanos) {
+ tasksClosed++;
+ lastClosed = now;
+ if (isIdle()) {
+ elapsedBeforeIdle += now - lastIdle;
+ }
+ }
+
+ /**
+ * Track a task is being opened.
+ *
+ * <p>This method does not aggregate delays, allowing the caller to sidestep the throttling.
+ * This call can make the collection of open tasks non-empty, that special case should be handled.
+ *
+ * @param now tick number corresponding to caller's present
+ */
+ protected void protectedOpenTask(final long now) {
+ if (isIdle()) {
+ lastIdle = Math.max(now, lastIdle);
+ }
+ tasksEncountered++;
+ }
+
+ /**
+ * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
+ *
+ * @param now tick number corresponding to caller's present
+ * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
+ */
+ abstract long estimateIsolatedDelay(final long now);
+}
@Override
protected void forwardEntry(final ConnectionEntry entry, final long now) {
+ // We are ignoring requested delay, as we have already paid the admission delay
successor().enqueueEntry(entry, now);
}
}
@NotThreadSafe
abstract class TransmitQueue {
static final class Halted extends TransmitQueue {
+ Halted(final int targetDepth) {
+ super(targetDepth);
+ }
+
@Override
int canTransmitCount(final int inflightSize) {
return 0;
private final BackendInfo backend;
private long nextTxSequence;
- Transmitting(final BackendInfo backend) {
+ Transmitting(final int targetDepth, final BackendInfo backend) {
+ super(targetDepth);
this.backend = Preconditions.checkNotNull(backend);
}
private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
-
+ private final ProgressTracker tracker;
private ReconnectForwarder successor;
+ TransmitQueue(final int targetDepth) {
+ tracker = new AveragingProgressTracker(targetDepth);
+ }
+
final Iterable<ConnectionEntry> asIterable() {
return Iterables.concat(inflight, pending);
}
- private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks,
- final long execNanos) {
- // TODO: record
+ final long ticksStalling(final long now) {
+ return tracker.ticksStalling(now);
}
+ // If a matching request was found, this will track a task was closed.
final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
if (maybeEntry == null) {
}
final TransmittedConnectionEntry entry = maybeEntry.get();
- recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
+ tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
// We have freed up a slot, try to transmit something
int toSend = canTransmitCount(inflight.size());
return Optional.of(entry);
}
- final void enqueue(final ConnectionEntry entry, final long now) {
+ /**
+ * Enqueue an entry, possibly also transmitting it.
+ *
+ * @return Delay to be forced on the calling thread, in nanoseconds.
+ */
+ final long enqueue(final ConnectionEntry entry, final long now) {
if (successor != null) {
successor.forwardEntry(entry, now);
- return;
+ return 0;
}
+ // Reserve an entry before we do anything that can fail
+ final long delay = tracker.openTask(now);
if (canTransmitCount(inflight.size()) <= 0) {
LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
pending.add(entry);
- return;
+ } else {
+ // We are not thread-safe and are supposed to be externally-guarded,
+ // hence send-before-record should be fine.
+ // This needs to be revisited if the external guards are lowered.
+ inflight.offer(transmit(entry, now));
+ LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
}
-
- // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine.
- // This needs to be revisited if the external guards are lowered.
- inflight.offer(transmit(entry, now));
- LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
+ return delay;
}
+ /**
+ * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
+ */
abstract int canTransmitCount(int inflightSize);
abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);