BUG-5280: Fix deadlock with TransmitQueue 24/50024/4
authorRobert Varga <rovarga@cisco.com>
Wed, 4 Jan 2017 15:10:57 +0000 (16:10 +0100)
committerRobert Varga <rovarga@cisco.com>
Wed, 4 Jan 2017 15:28:49 +0000 (16:28 +0100)
TransmitQueue should not be dispatching responses while
the connection lock is being held, as a that may expose
clients to AB/BA deadlocks if the callback tries to acquire
a lock which is being held by another thread attempting
to touch the same connection (i.e. transmit).

Fix this by TransmitQueue returning the entry to be
completed and AbstractClientConnection checking its
presence and performing actual completion after it has
dropped the connection lock.

Change-Id: Ibbacb641a297bfe7d4790af8401b036285c26593
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java

index 0366e7ace2496c0f5edd5433f237ca09db569a01..7dc150e403dc283a2181aad081fb1757afc690ea 100644 (file)
@@ -50,9 +50,11 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     private final TransmitQueue queue;
     private final Long cookie;
 
-    private volatile RequestException poisoned;
+    // Updated from actor thread only
     private long lastProgress;
 
+    private volatile RequestException poisoned;
+
     // Do not allow subclassing outside of this package
     AbstractClientConnection(final ClientActorContext context, final Long cookie,
             final TransmitQueue queue) {
@@ -252,13 +254,20 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     final void receiveResponse(final ResponseEnvelope<?> envelope) {
         final long now = readTime();
 
+        final Optional<TransmittedConnectionEntry> maybeEntry;
         lock.lock();
         try {
-            queue.complete(envelope, now);
+            maybeEntry = queue.complete(envelope, now);
         } finally {
             lock.unlock();
         }
 
+        if (maybeEntry.isPresent()) {
+            final TransmittedConnectionEntry entry = maybeEntry.get();
+            LOG.debug("Completing {} with {}", entry, envelope);
+            entry.complete(envelope.getMessage());
+        }
+
         lastProgress = readTime();
     }
 }
index 8690236aa6eaab06eda2cd87ca9b6220d5252633..e1c5589004ec6685b086efacc32972d890516d8b 100644 (file)
@@ -104,7 +104,7 @@ abstract class TransmitQueue {
         // TODO: record
     }
 
-    final void complete(final ResponseEnvelope<?> envelope, final long now) {
+    final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
         if (maybeEntry == null) {
             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
@@ -113,13 +113,10 @@ abstract class TransmitQueue {
 
         if (maybeEntry == null || !maybeEntry.isPresent()) {
             LOG.warn("No request matching {} found, ignoring response", envelope);
-            return;
+            return Optional.empty();
         }
 
         final TransmittedConnectionEntry entry = maybeEntry.get();
-        LOG.debug("Completing {} with {}", entry, envelope);
-        entry.complete(envelope.getMessage());
-
         recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
 
         // We have freed up a slot, try to transmit something
@@ -134,6 +131,8 @@ abstract class TransmitQueue {
             transmit(e, now);
             toSend--;
         }
+
+        return Optional.of(entry);
     }
 
     final void enqueue(final ConnectionEntry entry, final long now) {