Poison entries outside of main lock 32/82032/2
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 7 May 2019 17:42:26 +0000 (19:42 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 16 May 2019 14:14:26 +0000 (14:14 +0000)
Poisoning entries may involve reaction from their callbacks, which
can attempt to circle back through connections.

Make sure we poison them outside of lock context, so that any
callbacks end up seeing a poisoned connection, but without the lock
being held -- hence the locks can be acquired in-order.

JIRA: CONTROLLER-1893
Change-Id: I26551d052307812e76f3e45024a77dbb83312b17
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit e983d61d93fe2da50f9c4112fa28c7fe4ee5ffef)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java

index 03d4691cb44fe14eb05c33790632859c17d14b8f..e0739dd6c5a36cdff73e22ff64cb261baf8f1373 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -317,9 +318,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      */
     @VisibleForTesting
     final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
-        final Optional<Long> delay;
-
         lock.lock();
+
+        final List<ConnectionEntry> poisonEntries;
+        final NoProgressException poisonCause;
         try {
             haveTimer = false;
             final long now = currentTime();
@@ -329,36 +331,38 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
             final long ticksSinceProgress = queue.ticksStalling(now);
-            if (ticksSinceProgress >= context.config().getNoProgressTimeout()) {
-                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
-                    TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+            if (ticksSinceProgress < context.config().getNoProgressTimeout()) {
+                // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
+                // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual
+                // tri-state return convention.
+                final Optional<Long> delay = lockedCheckTimeout(now);
+                if (delay == null) {
+                    // We have timed out. There is no point in scheduling a timer
+                    LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
+                    return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
+                        new TimeoutException()));
+                }
 
-                lockedPoison(new NoProgressException(ticksSinceProgress));
-                current.removeConnection(this);
-                return current;
-            }
+                if (delay.isPresent()) {
+                    // If there is new delay, schedule a timer
+                    scheduleTimer(delay.get());
+                } else {
+                    LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
+                }
 
-            // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
-            // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
-            // return convention.
-            delay = lockedCheckTimeout(now);
-            if (delay == null) {
-                // We have timed out. There is no point in scheduling a timer
-                LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
-                return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
-                    new TimeoutException()));
+                return current;
             }
 
-            if (delay.isPresent()) {
-                // If there is new delay, schedule a timer
-                scheduleTimer(delay.get());
-            } else {
-                LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
-            }
+            LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+                TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+            poisonCause = new NoProgressException(ticksSinceProgress);
+            poisonEntries = lockedPoison(poisonCause);
+            current.removeConnection(this);
         } finally {
             lock.unlock();
         }
 
+        poison(poisonEntries, poisonCause);
         return current;
     }
 
@@ -436,18 +440,31 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     final void poison(final RequestException cause) {
+        final List<ConnectionEntry> entries;
+
         lock.lock();
         try {
-            lockedPoison(cause);
+            entries = lockedPoison(cause);
         } finally {
             lock.unlock();
         }
+
+        poison(entries, cause);
+    }
+
+    // Do not hold any locks while calling this
+    private static void poison(final Collection<? extends ConnectionEntry> entries, final RequestException cause) {
+        for (ConnectionEntry e : entries) {
+            final Request<?, ?> request = e.getRequest();
+            LOG.trace("Poisoning request {}", request, cause);
+            e.complete(request.toRequestFailure(cause));
+        }
     }
 
     @GuardedBy("lock")
-    private void lockedPoison(final RequestException cause) {
+    private List<ConnectionEntry> lockedPoison(final RequestException cause) {
         poisoned = enrichPoison(cause);
-        queue.poison(cause);
+        return queue.poison();
     }
 
     RequestException enrichPoison(final RequestException ex) {
index 0313a72a8319fc107a967a2bfbb7c188b4a08ac4..da64ddf69f93af2a882cda25c261001ca8eb5510 100644 (file)
@@ -13,15 +13,16 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
@@ -338,9 +339,13 @@ abstract class TransmitQueue {
         return pending.peek();
     }
 
-    final void poison(final RequestException cause) {
-        poisonQueue(inflight, cause);
-        poisonQueue(pending, cause);
+    final List<ConnectionEntry> poison() {
+        final List<ConnectionEntry> entries = new ArrayList<>(inflight.size() + pending.size());
+        entries.addAll(inflight);
+        inflight.clear();
+        entries.addAll(pending);
+        pending.clear();
+        return entries;
     }
 
     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
@@ -444,13 +449,4 @@ abstract class TransmitQueue {
 
         return null;
     }
-
-    private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
-        for (ConnectionEntry e : queue) {
-            final Request<?, ?> request = e.getRequest();
-            LOG.trace("Poisoning request {}", request, cause);
-            e.complete(request.toRequestFailure(cause));
-        }
-        queue.clear();
-    }
 }
index 69788e5da124327760ba5b63388101ab79a00104..7bebd7ac74b228a4e794be31aaa132b2ca212cd9 100644 (file)
@@ -8,9 +8,11 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import static org.hamcrest.CoreMatchers.everyItem;
-import static org.mockito.Matchers.any;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
 
 import akka.actor.ActorSystem;
@@ -21,10 +23,8 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.function.Consumer;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
@@ -35,7 +35,6 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
@@ -82,14 +81,14 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
             queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         }
         final Collection<ConnectionEntry> entries = queue.drain();
-        Assert.assertEquals(sentMessages, entries.size());
-        Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+        assertEquals(sentMessages, entries.size());
+        assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
     @Test
     public void testTicksStalling() {
         final long now = Ticker.systemTicker().read();
-        Assert.assertEquals(0, queue.ticksStalling(now));
+        assertEquals(0, queue.ticksStalling(now));
     }
 
     @Test
@@ -106,38 +105,38 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
         final Optional<TransmittedConnectionEntry> completed1 =
                 queue.complete(new SuccessEnvelope(success1, sessionId, txSequence, 1L), now);
-        Assert.assertFalse(completed1.isPresent());
+        assertFalse(completed1.isPresent());
         //different response sequence
         final long differentResponseSequence = 1L;
         final RequestSuccess<?, ?> success2 =
                 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, differentResponseSequence);
         final Optional<TransmittedConnectionEntry> completed2 =
                 queue.complete(new SuccessEnvelope(success2, sessionId, txSequence, 1L), now);
-        Assert.assertFalse(completed2.isPresent());
+        assertFalse(completed2.isPresent());
         //different tx sequence
         final long differentTxSequence = 1L;
         final RequestSuccess<?, ?> success3 =
                 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
         final Optional<TransmittedConnectionEntry> completed3 =
                 queue.complete(new SuccessEnvelope(success3, sessionId, differentTxSequence, 1L), now);
-        Assert.assertFalse(completed3.isPresent());
+        assertFalse(completed3.isPresent());
         //different session id
         final long differentSessionId = 1L;
         final RequestSuccess<?, ?> success4 =
                 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
         final Optional<TransmittedConnectionEntry> completed4 =
                 queue.complete(new SuccessEnvelope(success4, differentSessionId, differentTxSequence, 1L), now);
-        Assert.assertFalse(completed4.isPresent());
+        assertFalse(completed4.isPresent());
     }
 
     @Test
     public void testIsEmpty() {
-        Assert.assertTrue(queue.isEmpty());
+        assertTrue(queue.isEmpty());
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
-        Assert.assertFalse(queue.isEmpty());
+        assertFalse(queue.isEmpty());
     }
 
     @Test
@@ -150,7 +149,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
         queue.enqueueOrForward(entry1, now);
         queue.enqueueOrForward(entry2, now);
-        Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest());
+        assertEquals(entry1.getRequest(), queue.peek().getRequest());
     }
 
     @Test
@@ -159,9 +158,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
-        queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
-        verify(callback).accept(any(TransactionFailure.class));
-        Assert.assertTrue(queue.isEmpty());
+        assertEquals(1, queue.poison().size());
     }
 
     @SuppressWarnings("unchecked")