TransmitQueue should not be dispatching responses while
the connection lock is being held, as a that may expose
clients to AB/BA deadlocks if the callback tries to acquire
a lock which is being held by another thread attempting
to touch the same connection (i.e. transmit).
Fix this by TransmitQueue returning the entry to be
completed and AbstractClientConnection checking its
presence and performing actual completion after it has
dropped the connection lock.
Change-Id: Ibbacb641a297bfe7d4790af8401b036285c26593
Signed-off-by: Robert Varga <rovarga@cisco.com>
private final TransmitQueue queue;
private final Long cookie;
private final TransmitQueue queue;
private final Long cookie;
- private volatile RequestException poisoned;
+ // Updated from actor thread only
private long lastProgress;
private long lastProgress;
+ private volatile RequestException poisoned;
+
// Do not allow subclassing outside of this package
AbstractClientConnection(final ClientActorContext context, final Long cookie,
final TransmitQueue queue) {
// Do not allow subclassing outside of this package
AbstractClientConnection(final ClientActorContext context, final Long cookie,
final TransmitQueue queue) {
final void receiveResponse(final ResponseEnvelope<?> envelope) {
final long now = readTime();
final void receiveResponse(final ResponseEnvelope<?> envelope) {
final long now = readTime();
+ final Optional<TransmittedConnectionEntry> maybeEntry;
- queue.complete(envelope, now);
+ maybeEntry = queue.complete(envelope, now);
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
+ if (maybeEntry.isPresent()) {
+ final TransmittedConnectionEntry entry = maybeEntry.get();
+ LOG.debug("Completing {} with {}", entry, envelope);
+ entry.complete(envelope.getMessage());
+ }
+
lastProgress = readTime();
}
}
lastProgress = readTime();
}
}
- final void complete(final ResponseEnvelope<?> envelope, final long now) {
+ final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
if (maybeEntry == null) {
LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
if (maybeEntry == null) {
LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
if (maybeEntry == null || !maybeEntry.isPresent()) {
LOG.warn("No request matching {} found, ignoring response", envelope);
if (maybeEntry == null || !maybeEntry.isPresent()) {
LOG.warn("No request matching {} found, ignoring response", envelope);
+ return Optional.empty();
}
final TransmittedConnectionEntry entry = maybeEntry.get();
}
final TransmittedConnectionEntry entry = maybeEntry.get();
- LOG.debug("Completing {} with {}", entry, envelope);
- entry.complete(envelope.getMessage());
-
recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
// We have freed up a slot, try to transmit something
recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
// We have freed up a slot, try to transmit something
transmit(e, now);
toSend--;
}
transmit(e, now);
toSend--;
}
+
+ return Optional.of(entry);
}
final void enqueue(final ConnectionEntry entry, final long now) {
}
final void enqueue(final ConnectionEntry entry, final long now) {