private final TransmitQueue queue;
private final Long cookie;
- private volatile RequestException poisoned;
+ // Updated from actor thread only
private long lastProgress;
+ private volatile RequestException poisoned;
+
// Do not allow subclassing outside of this package
AbstractClientConnection(final ClientActorContext context, final Long cookie,
final TransmitQueue queue) {
final void receiveResponse(final ResponseEnvelope<?> envelope) {
final long now = readTime();
+ final Optional<TransmittedConnectionEntry> maybeEntry;
lock.lock();
try {
- queue.complete(envelope, now);
+ maybeEntry = queue.complete(envelope, now);
} finally {
lock.unlock();
}
+ if (maybeEntry.isPresent()) {
+ final TransmittedConnectionEntry entry = maybeEntry.get();
+ LOG.debug("Completing {} with {}", entry, envelope);
+ entry.complete(envelope.getMessage());
+ }
+
lastProgress = readTime();
}
}