import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
}
@GuardedBy("lock")
- abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
+ abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current,
+ RequestException runtimeRequestException);
final long enqueueEntry(final ConnectionEntry entry, final long now) {
lock.lock();
}
}
- final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
+ final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current, final RequestException cause) {
lock.lock();
try {
- return lockedReconnect(current);
+ return lockedReconnect(current, cause);
} finally {
lock.unlock();
}
delay = lockedCheckTimeout(now);
if (delay == null) {
// We have timed out. There is no point in scheduling a timer
- return lockedReconnect(current);
+ return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
+ new TimeoutException()));
}
if (delay.isPresent()) {
@GuardedBy("lock")
private void lockedPoison(final RequestException cause) {
- poisoned = cause;
+ poisoned = enrichPoison(cause);
queue.poison(cause);
}
+ RequestException enrichPoison(final RequestException ex) {
+ return ex;
+ }
+
@VisibleForTesting
final RequestException poisoned() {
return poisoned;
* If the completion stage returned by this interface's methods fails with a
* {@link org.opendaylight.controller.cluster.access.concepts.RequestException}, it will be forwarded to all
* outstanding requests towards the leader. If it fails with a {@link java.util.concurrent.TimeoutException},
- * resolution process will be retries. If it fails with any other cause, it will we wrapped as a
+ * resolution process will be retried. If it fails with any other cause, it will we wrapped as a
* {@link org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException} wrapping that cause.
*
* @author Robert Varga
return this;
} else if (conn != null) {
LOG.info("{}: connection {} indicated no leadership, reconnecting it", persistenceId(), conn, cause);
- return conn.reconnect(this);
+ return conn.reconnect(this, cause);
}
}
if (cause instanceof OutOfSequenceEnvelopeException) {
} else if (conn != null) {
LOG.info("{}: connection {} indicated no sequencing mismatch on {} sequence {}, reconnecting it",
persistenceId(), conn, failure.getTarget(), failure.getSequence(), cause);
- return conn.reconnect(this);
+ return conn.reconnect(this, cause);
}
}
import com.google.common.annotations.Beta;
import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
@Beta
@NotThreadSafe
}
@Override
- ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
- final ReconnectingClientConnection<T> next = new ReconnectingClientConnection<>(this);
+ ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current, final RequestException cause) {
+ final ReconnectingClientConnection<T> next = new ReconnectingClientConnection<>(this, cause);
setForwarder(new SimpleReconnectForwarder(next));
current.reconnectConnection(this, next);
return current;
import com.google.common.annotations.Beta;
import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
@Beta
public final class ConnectingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
}
@Override
- ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
- throw new UnsupportedOperationException("Attempted to reconnect a connecting connection");
+ ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current, final RequestException cause) {
+ throw new UnsupportedOperationException("Attempted to reconnect a connecting connection", cause);
}
}
*/
package org.opendaylight.controller.cluster.access.client;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class ReconnectingClientConnection<T extends BackendInfo> extends AbstractReceivingClientConnection<T> {
private static final Logger LOG = LoggerFactory.getLogger(ReconnectingClientConnection.class);
- ReconnectingClientConnection(final ConnectedClientConnection<T> oldConnection) {
+ private RequestException cause;
+
+ ReconnectingClientConnection(final ConnectedClientConnection<T> oldConnection, final RequestException cause) {
super(oldConnection);
+ this.cause = Preconditions.checkNotNull(cause);
}
@Override
- ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current) {
- // Intentional no-op
+ ClientActorBehavior<T> lockedReconnect(final ClientActorBehavior<T> current, final RequestException cause) {
+ this.cause = Preconditions.checkNotNull(cause);
LOG.debug("Skipping reconnect of already-reconnecting connection {}", this);
return current;
}
+
+ @Override
+ RequestException enrichPoison(final RequestException ex) {
+ if (ex.getCause() != null) {
+ ex.addSuppressed(cause);
+ } else {
+ ex.initCause(cause);
+ }
+
+ return ex;
+ }
+
}
import org.junit.Test;
import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
public class ConnectedClientConnectionTest
extends AbstractClientConnectionTest<ConnectedClientConnection<BackendInfo>, BackendInfo> {
@Test
public void testReconnectConnection() throws Exception {
final ClientActorBehavior<BackendInfo> behavior = mock(ClientActorBehavior.class);
- connection.lockedReconnect(behavior);
+ connection.lockedReconnect(behavior, mock(RequestException.class));
verify(behavior).reconnectConnection(same(connection), any(ReconnectingClientConnection.class));
}
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
final ConnectedClientConnection<BackendInfo> oldConnection =
new ConnectedClientConnection<>(context, 0L, backend);
- return new ReconnectingClientConnection<>(oldConnection);
+ return new ReconnectingClientConnection<>(oldConnection, mock(RequestException.class));
}
@Override
@Test
public void testReconnectConnection() throws Exception {
final ClientActorBehavior<BackendInfo> behavior = mock(ClientActorBehavior.class);
- Assert.assertSame(behavior, connection.lockedReconnect(behavior));
+ Assert.assertSame(behavior, connection.lockedReconnect(behavior, mock(RequestException.class)));
}
@Override
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
LOG.debug("Shard {} failed to resolve", shardName, failure);
if (failure instanceof NoShardLeaderException) {
- // FIXME: this actually is an exception we can retry on
- future.completeExceptionally(failure);
+ future.completeExceptionally(wrap("Shard has no current leader", failure));
} else if (failure instanceof NotInitializedException) {
// FIXME: this actually is an exception we can retry on
LOG.info("Shard {} has not initialized yet", shardName);
return new ShardState(future);
}
+ private static TimeoutException wrap(final String message, final Throwable cause) {
+ final TimeoutException ret = new TimeoutException(message);
+ ret.initCause(Preconditions.checkNotNull(cause));
+ return ret;
+ }
+
private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
final CompletableFuture<ShardBackendInfo> future) {
LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
// Create the write Tx.
- try (DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
- : dataStore.newReadWriteTransaction()) {
+ DOMStoreWriteTransaction writeTxToClose = null;
+ try {
+ writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
+ : dataStore.newReadWriteTransaction();
+ final DOMStoreWriteTransaction writeTx = writeTxToClose;
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modifications and ready the Tx on a separate
txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
fail("Expected NoShardLeaderException");
} catch (final ExecutionException e) {
- Throwables.propagate(Throwables.getRootCause(e));
+ assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
+ assertEquals(DistributedDataStore.class, testParameter);
+ } catch (TimeoutException e) {
+ // ClientBackedDataStore doesn't set cause to ExecutionException, future just time outs
+ assertEquals(ClientBackedDataStore.class, testParameter);
+ }
+ } finally {
+ try {
+ if (writeTxToClose != null) {
+ writeTxToClose.close();
+ }
+ } catch (Exception e) {
+ // FIXME TransactionProxy.close throws IllegalStateException:
+ // Transaction is ready, it cannot be closed
}
}
}
};
}
- @Test(expected = NoShardLeaderException.class)
+ @Test
public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
}
- @Test(expected = NoShardLeaderException.class)
+ @Test
public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
}
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Assume;
final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
|| e.getCause() instanceof ShardLeaderNotRespondingException);
+ assertEquals(DistributedDataStore.class, testParameter);
+ } catch (final TimeoutException e) {
+ // ClientBackedDataStore doesn't set cause to ExecutionException, future just time outs
+ assertEquals(ClientBackedDataStore.class, testParameter);
}
}
final String msg = "Expected instance of NoShardLeaderException, actual: \n"
+ Throwables.getStackTraceAsString(e.getCause());
assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
+ assertEquals(DistributedDataStore.class, testParameter);
+ } catch (TimeoutException e) {
+ // ClientBackedDataStore doesn't set cause to ExecutionException, future just time outs
+ assertEquals(ClientBackedDataStore.class, testParameter);
}
}