Poison entries outside of main lock
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / AbstractClientConnection.java
index 361027af1de299fab960c487c32b0c09e51bbdb1..0d45dd5c6918ca2d97b61898b4f291349e48a4f2 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -316,9 +317,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      */
     @VisibleForTesting
     final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
-        final Optional<Long> delay;
-
         lock.lock();
+
+        final List<ConnectionEntry> poisonEntries;
+        final NoProgressException poisonCause;
         try {
             haveTimer = false;
             final long now = currentTime();
@@ -328,36 +330,38 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
             final long ticksSinceProgress = queue.ticksStalling(now);
-            if (ticksSinceProgress >= context.config().getNoProgressTimeout()) {
-                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
-                    TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+            if (ticksSinceProgress < context.config().getNoProgressTimeout()) {
+                // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
+                // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual
+                // tri-state return convention.
+                final Optional<Long> delay = lockedCheckTimeout(now);
+                if (delay == null) {
+                    // We have timed out. There is no point in scheduling a timer
+                    LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
+                    return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
+                        new TimeoutException()));
+                }
 
-                lockedPoison(new NoProgressException(ticksSinceProgress));
-                current.removeConnection(this);
-                return current;
-            }
+                if (delay.isPresent()) {
+                    // If there is new delay, schedule a timer
+                    scheduleTimer(delay.get());
+                } else {
+                    LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
+                }
 
-            // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
-            // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
-            // return convention.
-            delay = lockedCheckTimeout(now);
-            if (delay == null) {
-                // We have timed out. There is no point in scheduling a timer
-                LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
-                return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
-                    new TimeoutException()));
+                return current;
             }
 
-            if (delay.isPresent()) {
-                // If there is new delay, schedule a timer
-                scheduleTimer(delay.get());
-            } else {
-                LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
-            }
+            LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+                TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+            poisonCause = new NoProgressException(ticksSinceProgress);
+            poisonEntries = lockedPoison(poisonCause);
+            current.removeConnection(this);
         } finally {
             lock.unlock();
         }
 
+        poison(poisonEntries, poisonCause);
         return current;
     }
 
@@ -435,18 +439,31 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     final void poison(final RequestException cause) {
+        final List<ConnectionEntry> entries;
+
         lock.lock();
         try {
-            lockedPoison(cause);
+            entries = lockedPoison(cause);
         } finally {
             lock.unlock();
         }
+
+        poison(entries, cause);
+    }
+
+    // Do not hold any locks while calling this
+    private static void poison(final Collection<? extends ConnectionEntry> entries, final RequestException cause) {
+        for (ConnectionEntry e : entries) {
+            final Request<?, ?> request = e.getRequest();
+            LOG.trace("Poisoning request {}", request, cause);
+            e.complete(request.toRequestFailure(cause));
+        }
     }
 
     @Holding("lock")
-    private void lockedPoison(final RequestException cause) {
+    private List<ConnectionEntry> lockedPoison(final RequestException cause) {
         poisoned = enrichPoison(cause);
-        queue.poison(cause);
+        return queue.poison();
     }
 
     RequestException enrichPoison(final RequestException ex) {