private final TransmitQueue queue;
private final Long cookie;
- private volatile RequestException poisoned;
+ // Updated from actor thread only
private long lastProgress;
+ private volatile RequestException poisoned;
+
// Do not allow subclassing outside of this package
AbstractClientConnection(final ClientActorContext context, final Long cookie,
final TransmitQueue queue) {
final void receiveResponse(final ResponseEnvelope<?> envelope) {
final long now = readTime();
+ final Optional<TransmittedConnectionEntry> maybeEntry;
lock.lock();
try {
- queue.complete(envelope, now);
+ maybeEntry = queue.complete(envelope, now);
} finally {
lock.unlock();
}
+ if (maybeEntry.isPresent()) {
+ final TransmittedConnectionEntry entry = maybeEntry.get();
+ LOG.debug("Completing {} with {}", entry, envelope);
+ entry.complete(envelope.getMessage());
+ }
+
lastProgress = readTime();
}
}
// TODO: record
}
- final void complete(final ResponseEnvelope<?> envelope, final long now) {
+ final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
if (maybeEntry == null) {
LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
if (maybeEntry == null || !maybeEntry.isPresent()) {
LOG.warn("No request matching {} found, ignoring response", envelope);
- return;
+ return Optional.empty();
}
final TransmittedConnectionEntry entry = maybeEntry.get();
- LOG.debug("Completing {} with {}", entry, envelope);
- entry.complete(envelope.getMessage());
-
recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
// We have freed up a slot, try to transmit something
transmit(e, now);
toSend--;
}
+
+ return Optional.of(entry);
}
final void enqueue(final ConnectionEntry entry, final long now) {