BUG-8452: make NoShardLeaderException retriable 92/57092/2
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 15 May 2017 14:56:14 +0000 (16:56 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 07:45:58 +0000 (09:45 +0200)
We can recover from this exception by retrying the connection to
the backend. Wrap it in a TimeoutException, which will cause a new
connection attempt.

Change-Id: I1d5c771fdb89cbdd7723e0425542154a1ed85853
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit c74608b67d88d809ebec51c0e84add37a0b98711)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfoResolver.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnectionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index da016ba..cd81a4e 100644 (file)
@@ -16,6 +16,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
@@ -26,6 +27,7 @@ import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -173,7 +175,8 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     @GuardedBy("lock")
-    abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
+    abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current,
+            RequestException runtimeRequestException);
 
     final long enqueueEntry(final ConnectionEntry entry, final long now) {
         lock.lock();
@@ -193,10 +196,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
     }
 
-    final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
+    final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current, final RequestException cause) {
         lock.lock();
         try {
-            return lockedReconnect(current);
+            return lockedReconnect(current, cause);
         } finally {
             lock.unlock();
         }
@@ -261,7 +264,8 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             delay = lockedCheckTimeout(now);
             if (delay == null) {
                 // We have timed out. There is no point in scheduling a timer
-                return lockedReconnect(current);
+                return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
+                    new TimeoutException()));
             }
 
             if (delay.isPresent()) {
@@ -339,10 +343,14 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     @GuardedBy("lock")
     private void lockedPoison(final RequestException cause) {
-        poisoned = cause;
+        poisoned = enrichPoison(cause);
         queue.poison(cause);
     }
 
+    RequestException enrichPoison(final RequestException ex) {
+        return ex;
+    }
+
     @VisibleForTesting
     final RequestException poisoned() {
         return poisoned;
index 2e8ab4e..4ece691 100644 (file)
@@ -21,7 +21,7 @@ import javax.annotation.Nonnull;
  * If the completion stage returned by this interface's methods fails with a
  * {@link org.opendaylight.controller.cluster.access.concepts.RequestException}, it will be forwarded to all
  * outstanding requests towards the leader. If it fails with a {@link java.util.concurrent.TimeoutException},
- * resolution process will be retries. If it fails with any other cause, it will we wrapped as a
+ * resolution process will be retried. If it fails with any other cause, it will we wrapped as a
  * {@link org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException} wrapping that cause.
  *
  * @author Robert Varga
index ee63edd..9ba118e 100644 (file)
@@ -174,7 +174,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
                 return this;
             } else if (conn != null) {
                 LOG.info("{}: connection {} indicated no leadership, reconnecting it", persistenceId(), conn, cause);
-                return conn.reconnect(this);
+                return conn.reconnect(this, cause);
             }
         }
         if (cause instanceof OutOfSequenceEnvelopeException) {
@@ -185,7 +185,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             } else if (conn != null) {
                 LOG.info("{}: connection {} indicated no sequencing mismatch on {} sequence {}, reconnecting it",
                     persistenceId(), conn, failure.getTarget(), failure.getSequence(), cause);
-                return conn.reconnect(this);
+                return conn.reconnect(this, cause);
             }
         }
 
index cade27f..0afc7ac 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client;
 
 import com.google.common.annotations.Beta;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 
 @Beta
 @NotThreadSafe
@@ -18,8 +19,8 @@ public final class ConnectedClientConnection<T extends BackendInfo> extends Abst
     }
 
     @Override
-    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
-        final ReconnectingClientConnection<T> next = new ReconnectingClientConnection<>(this);
+    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current, final RequestException cause) {
+        final ReconnectingClientConnection<T> next = new ReconnectingClientConnection<>(this, cause);
         setForwarder(new SimpleReconnectForwarder(next));
         current.reconnectConnection(this, next);
         return current;
index 12c520b..a36267c 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client;
 
 import com.google.common.annotations.Beta;
 import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 
 @Beta
 public final class ConnectingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
@@ -30,7 +31,7 @@ public final class ConnectingClientConnection<T extends BackendInfo> extends Abs
     }
 
     @Override
-    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
-        throw new UnsupportedOperationException("Attempted to reconnect a connecting connection");
+    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current, final RequestException cause) {
+        throw new UnsupportedOperationException("Attempted to reconnect a connecting connection", cause);
     }
 }
index 0aac7f4..b59c9e3 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.access.client;
 
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -20,14 +22,29 @@ import org.slf4j.LoggerFactory;
 public final class ReconnectingClientConnection<T extends BackendInfo> extends AbstractReceivingClientConnection<T> {
     private static final Logger LOG = LoggerFactory.getLogger(ReconnectingClientConnection.class);
 
-    ReconnectingClientConnection(final ConnectedClientConnection<T> oldConnection) {
+    private RequestException cause;
+
+    ReconnectingClientConnection(final ConnectedClientConnection<T> oldConnection, final RequestException cause) {
         super(oldConnection);
+        this.cause = Preconditions.checkNotNull(cause);
     }
 
     @Override
-    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
-        // Intentional no-op
+    ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current, final RequestException cause) {
+        this.cause = Preconditions.checkNotNull(cause);
         LOG.debug("Skipping reconnect of already-reconnecting connection {}", this);
         return current;
     }
+
+    @Override
+    RequestException enrichPoison(final RequestException ex) {
+        if (ex.getCause() != null) {
+            ex.addSuppressed(cause);
+        } else {
+            ex.initCause(cause);
+        }
+
+        return ex;
+    }
+
 }
index eb10152..e66512d 100644 (file)
@@ -14,6 +14,7 @@ import static org.mockito.Mockito.verify;
 
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 
 public class ConnectedClientConnectionTest
         extends AbstractClientConnectionTest<ConnectedClientConnection<BackendInfo>, BackendInfo> {
@@ -28,7 +29,7 @@ public class ConnectedClientConnectionTest
     @Test
     public void testReconnectConnection() throws Exception {
         final ClientActorBehavior<BackendInfo> behavior = mock(ClientActorBehavior.class);
-        connection.lockedReconnect(behavior);
+        connection.lockedReconnect(behavior, mock(RequestException.class));
         verify(behavior).reconnectConnection(same(connection), any(ReconnectingClientConnection.class));
     }
 
index 142e948..b36dd7c 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
@@ -34,14 +35,14 @@ public class ReconnectingClientConnectionTest
 
         final ConnectedClientConnection<BackendInfo> oldConnection =
                 new ConnectedClientConnection<>(context, 0L, backend);
-        return new ReconnectingClientConnection<>(oldConnection);
+        return new ReconnectingClientConnection<>(oldConnection, mock(RequestException.class));
     }
 
     @Override
     @Test
     public void testReconnectConnection() throws Exception {
         final ClientActorBehavior<BackendInfo> behavior = mock(ClientActorBehavior.class);
-        Assert.assertSame(behavior, connection.lockedReconnect(behavior));
+        Assert.assertSame(behavior, connection.lockedReconnect(behavior, mock(RequestException.class)));
     }
 
     @Override
index 93cf793..6b221da 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.primitives.UnsignedLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -107,8 +108,7 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
 
             LOG.debug("Shard {} failed to resolve", shardName, failure);
             if (failure instanceof NoShardLeaderException) {
-                // FIXME: this actually is an exception we can retry on
-                future.completeExceptionally(failure);
+                future.completeExceptionally(wrap("Shard has no current leader", failure));
             } else if (failure instanceof NotInitializedException) {
                 // FIXME: this actually is an exception we can retry on
                 LOG.info("Shard {} has not initialized yet", shardName);
@@ -124,6 +124,12 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
         return new ShardState(future);
     }
 
+    private static TimeoutException wrap(final String message, final Throwable cause) {
+        final TimeoutException ret = new TimeoutException(message);
+        ret.initCause(Preconditions.checkNotNull(cause));
+        return ret;
+    }
+
     private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
             final CompletableFuture<ShardBackendInfo> future) {
         LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
index 848a8a6..4a7a813 100644 (file)
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Before;
@@ -658,8 +659,11 @@ public class DistributedDataStoreIntegrationTest {
                     assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
 
                     // Create the write Tx.
-                    try (DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
-                            : dataStore.newReadWriteTransaction()) {
+                    DOMStoreWriteTransaction writeTxToClose = null;
+                    try {
+                        writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
+                                : dataStore.newReadWriteTransaction();
+                        final DOMStoreWriteTransaction writeTx = writeTxToClose;
                         assertNotNull("newReadWriteTransaction returned null", writeTx);
 
                         // Do some modifications and ready the Tx on a separate
@@ -698,7 +702,20 @@ public class DistributedDataStoreIntegrationTest {
                             txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
                             fail("Expected NoShardLeaderException");
                         } catch (final ExecutionException e) {
-                            Throwables.propagate(Throwables.getRootCause(e));
+                            assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
+                            assertEquals(DistributedDataStore.class, testParameter);
+                        } catch (TimeoutException e) {
+                            // ClientBackedDataStore doesn't set cause to ExecutionException, future just time outs
+                            assertEquals(ClientBackedDataStore.class, testParameter);
+                        }
+                    } finally {
+                        try {
+                            if (writeTxToClose != null) {
+                                writeTxToClose.close();
+                            }
+                        } catch (Exception e) {
+                            // FIXME TransactionProxy.close throws IllegalStateException:
+                            // Transaction is ready, it cannot be closed
                         }
                     }
                 }
@@ -706,13 +723,13 @@ public class DistributedDataStoreIntegrationTest {
         };
     }
 
-    @Test(expected = NoShardLeaderException.class)
+    @Test
     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
     }
 
-    @Test(expected = NoShardLeaderException.class)
+    @Test
     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
     }
index f58fd0d..dad3f1b 100644 (file)
@@ -42,6 +42,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.junit.After;
 import org.junit.Assume;
@@ -1024,6 +1025,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
             assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
                     || e.getCause() instanceof ShardLeaderNotRespondingException);
+            assertEquals(DistributedDataStore.class, testParameter);
+        } catch (final TimeoutException e) {
+            // ClientBackedDataStore doesn't set cause to ExecutionException, future just time outs
+            assertEquals(ClientBackedDataStore.class, testParameter);
         }
     }
 
@@ -1058,6 +1063,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             final String msg = "Expected instance of NoShardLeaderException, actual: \n"
                     + Throwables.getStackTraceAsString(e.getCause());
             assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
+            assertEquals(DistributedDataStore.class, testParameter);
+        } catch (TimeoutException e) {
+            // ClientBackedDataStore doesn't set cause to ExecutionException, future just time outs
+            assertEquals(ClientBackedDataStore.class, testParameter);
         }
     }
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.