tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
// We have freed up a slot, try to transmit something
+ tryTransmit(now);
+
+ return Optional.of(entry);
+ }
+
+ final void tryTransmit(final long now) {
final int toSend = canTransmitCount(inflight.size());
if (toSend > 0 && !pending.isEmpty()) {
transmitEntries(toSend, now);
}
-
- return Optional.of(entry);
}
private void transmitEntries(final int maxTransmit, final long now) {
}
}
+ final void remove(final long now) {
+ final TransmittedConnectionEntry txe = inflight.poll();
+ if (txe == null) {
+ final ConnectionEntry entry = pending.pop();
+ tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
+ } else {
+ tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
+ }
+ }
+
@VisibleForTesting
Deque<TransmittedConnectionEntry> getInflight() {
return inflight;
}
queue.clear();
}
-
}