Poison entries outside of main lock 49/81949/4
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 7 May 2019 17:42:26 +0000 (19:42 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 9 May 2019 08:47:35 +0000 (10:47 +0200)
Poisoning entries may involve reaction from their callbacks, which
can attempt to circle back through connections.

Make sure we poison them outside of lock context, so that any
callbacks end up seeing a poisoned connection, but without the lock
being held -- hence the locks can be acquired in-order.

JIRA: CONTROLLER-1893
Change-Id: I26551d052307812e76f3e45024a77dbb83312b17
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java

index 361027a..0d45dd5 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -316,9 +317,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      */
     @VisibleForTesting
     final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
-        final Optional<Long> delay;
-
         lock.lock();
+
+        final List<ConnectionEntry> poisonEntries;
+        final NoProgressException poisonCause;
         try {
             haveTimer = false;
             final long now = currentTime();
@@ -328,36 +330,38 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
             final long ticksSinceProgress = queue.ticksStalling(now);
-            if (ticksSinceProgress >= context.config().getNoProgressTimeout()) {
-                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
-                    TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+            if (ticksSinceProgress < context.config().getNoProgressTimeout()) {
+                // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
+                // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual
+                // tri-state return convention.
+                final Optional<Long> delay = lockedCheckTimeout(now);
+                if (delay == null) {
+                    // We have timed out. There is no point in scheduling a timer
+                    LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
+                    return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
+                        new TimeoutException()));
+                }
 
-                lockedPoison(new NoProgressException(ticksSinceProgress));
-                current.removeConnection(this);
-                return current;
-            }
+                if (delay.isPresent()) {
+                    // If there is new delay, schedule a timer
+                    scheduleTimer(delay.get());
+                } else {
+                    LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
+                }
 
-            // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
-            // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
-            // return convention.
-            delay = lockedCheckTimeout(now);
-            if (delay == null) {
-                // We have timed out. There is no point in scheduling a timer
-                LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
-                return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
-                    new TimeoutException()));
+                return current;
             }
 
-            if (delay.isPresent()) {
-                // If there is new delay, schedule a timer
-                scheduleTimer(delay.get());
-            } else {
-                LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
-            }
+            LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+                TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+            poisonCause = new NoProgressException(ticksSinceProgress);
+            poisonEntries = lockedPoison(poisonCause);
+            current.removeConnection(this);
         } finally {
             lock.unlock();
         }
 
+        poison(poisonEntries, poisonCause);
         return current;
     }
 
@@ -435,18 +439,31 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     final void poison(final RequestException cause) {
+        final List<ConnectionEntry> entries;
+
         lock.lock();
         try {
-            lockedPoison(cause);
+            entries = lockedPoison(cause);
         } finally {
             lock.unlock();
         }
+
+        poison(entries, cause);
+    }
+
+    // Do not hold any locks while calling this
+    private static void poison(final Collection<? extends ConnectionEntry> entries, final RequestException cause) {
+        for (ConnectionEntry e : entries) {
+            final Request<?, ?> request = e.getRequest();
+            LOG.trace("Poisoning request {}", request, cause);
+            e.complete(request.toRequestFailure(cause));
+        }
     }
 
     @Holding("lock")
-    private void lockedPoison(final RequestException cause) {
+    private List<ConnectionEntry> lockedPoison(final RequestException cause) {
         poisoned = enrichPoison(cause);
-        queue.poison(cause);
+        return queue.poison();
     }
 
     RequestException enrichPoison(final RequestException ex) {
index be14d05..e4ee78c 100644 (file)
@@ -13,14 +13,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
@@ -336,9 +337,13 @@ abstract class TransmitQueue {
         return pending.peek();
     }
 
-    final void poison(final RequestException cause) {
-        poisonQueue(inflight, cause);
-        poisonQueue(pending, cause);
+    final List<ConnectionEntry> poison() {
+        final List<ConnectionEntry> entries = new ArrayList<>(inflight.size() + pending.size());
+        entries.addAll(inflight);
+        inflight.clear();
+        entries.addAll(pending);
+        pending.clear();
+        return entries;
     }
 
     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
@@ -442,13 +447,4 @@ abstract class TransmitQueue {
 
         return null;
     }
-
-    private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
-        for (ConnectionEntry e : queue) {
-            final Request<?, ?> request = e.getRequest();
-            LOG.trace("Poisoning request {}", request, cause);
-            e.complete(request.toRequestFailure(cause));
-        }
-        queue.clear();
-    }
 }
index 69788e5..7bebd7a 100644 (file)
@@ -8,9 +8,11 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import static org.hamcrest.CoreMatchers.everyItem;
-import static org.mockito.Matchers.any;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
 
 import akka.actor.ActorSystem;
@@ -21,10 +23,8 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.function.Consumer;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
@@ -35,7 +35,6 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
@@ -82,14 +81,14 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
             queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         }
         final Collection<ConnectionEntry> entries = queue.drain();
-        Assert.assertEquals(sentMessages, entries.size());
-        Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+        assertEquals(sentMessages, entries.size());
+        assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
     @Test
     public void testTicksStalling() {
         final long now = Ticker.systemTicker().read();
-        Assert.assertEquals(0, queue.ticksStalling(now));
+        assertEquals(0, queue.ticksStalling(now));
     }
 
     @Test
@@ -106,38 +105,38 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
         final Optional<TransmittedConnectionEntry> completed1 =
                 queue.complete(new SuccessEnvelope(success1, sessionId, txSequence, 1L), now);
-        Assert.assertFalse(completed1.isPresent());
+        assertFalse(completed1.isPresent());
         //different response sequence
         final long differentResponseSequence = 1L;
         final RequestSuccess<?, ?> success2 =
                 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, differentResponseSequence);
         final Optional<TransmittedConnectionEntry> completed2 =
                 queue.complete(new SuccessEnvelope(success2, sessionId, txSequence, 1L), now);
-        Assert.assertFalse(completed2.isPresent());
+        assertFalse(completed2.isPresent());
         //different tx sequence
         final long differentTxSequence = 1L;
         final RequestSuccess<?, ?> success3 =
                 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
         final Optional<TransmittedConnectionEntry> completed3 =
                 queue.complete(new SuccessEnvelope(success3, sessionId, differentTxSequence, 1L), now);
-        Assert.assertFalse(completed3.isPresent());
+        assertFalse(completed3.isPresent());
         //different session id
         final long differentSessionId = 1L;
         final RequestSuccess<?, ?> success4 =
                 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
         final Optional<TransmittedConnectionEntry> completed4 =
                 queue.complete(new SuccessEnvelope(success4, differentSessionId, differentTxSequence, 1L), now);
-        Assert.assertFalse(completed4.isPresent());
+        assertFalse(completed4.isPresent());
     }
 
     @Test
     public void testIsEmpty() {
-        Assert.assertTrue(queue.isEmpty());
+        assertTrue(queue.isEmpty());
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
-        Assert.assertFalse(queue.isEmpty());
+        assertFalse(queue.isEmpty());
     }
 
     @Test
@@ -150,7 +149,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
         queue.enqueueOrForward(entry1, now);
         queue.enqueueOrForward(entry2, now);
-        Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest());
+        assertEquals(entry1.getRequest(), queue.peek().getRequest());
     }
 
     @Test
@@ -159,9 +158,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
-        queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
-        verify(callback).accept(any(TransactionFailure.class));
-        Assert.assertTrue(queue.isEmpty());
+        assertEquals(1, queue.poison().size());
     }
 
     @SuppressWarnings("unchecked")

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.