@NotThreadSafe
abstract class TransmitQueue {
static final class Halted extends TransmitQueue {
+ Halted(final int targetDepth) {
+ super(targetDepth);
+ }
+
@Override
int canTransmitCount(final int inflightSize) {
return 0;
private final BackendInfo backend;
private long nextTxSequence;
- Transmitting(final BackendInfo backend) {
+ Transmitting(final int targetDepth, final BackendInfo backend) {
+ super(targetDepth);
this.backend = Preconditions.checkNotNull(backend);
}
private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
-
+ private final ProgressTracker tracker;
private ReconnectForwarder successor;
+ TransmitQueue(final int targetDepth) {
+ tracker = new AveragingProgressTracker(targetDepth);
+ }
+
final Iterable<ConnectionEntry> asIterable() {
return Iterables.concat(inflight, pending);
}
- private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks,
- final long execNanos) {
- // TODO: record
+ final long ticksStalling(final long now) {
+ return tracker.ticksStalling(now);
}
+ // If a matching request was found, this will track a task was closed.
final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
if (maybeEntry == null) {
}
final TransmittedConnectionEntry entry = maybeEntry.get();
- recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
+ tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
// We have freed up a slot, try to transmit something
int toSend = canTransmitCount(inflight.size());
return Optional.of(entry);
}
- final void enqueue(final ConnectionEntry entry, final long now) {
+ /**
+ * Enqueue an entry, possibly also transmitting it.
+ *
+ * @return Delay to be forced on the calling thread, in nanoseconds.
+ */
+ final long enqueue(final ConnectionEntry entry, final long now) {
if (successor != null) {
successor.forwardEntry(entry, now);
- return;
+ return 0;
}
+ // Reserve an entry before we do anything that can fail
+ final long delay = tracker.openTask(now);
if (canTransmitCount(inflight.size()) <= 0) {
LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
pending.add(entry);
- return;
+ } else {
+ // We are not thread-safe and are supposed to be externally-guarded,
+ // hence send-before-record should be fine.
+ // This needs to be revisited if the external guards are lowered.
+ inflight.offer(transmit(entry, now));
+ LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
}
-
- // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine.
- // This needs to be revisited if the external guards are lowered.
- inflight.offer(transmit(entry, now));
- LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
+ return delay;
}
+ /**
+ * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
+ */
abstract int canTransmitCount(int inflightSize);
abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);