import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
LOG.debug("Shard {} failed to resolve", shardName, failure);
if (failure instanceof NoShardLeaderException) {
- // FIXME: this actually is an exception we can retry on
- future.completeExceptionally(failure);
+ future.completeExceptionally(wrap("Shard has no current leader", failure));
} else if (failure instanceof NotInitializedException) {
// FIXME: this actually is an exception we can retry on
LOG.info("Shard {} has not initialized yet", shardName);
return new ShardState(future);
}
+ private static TimeoutException wrap(final String message, final Throwable cause) {
+ final TimeoutException ret = new TimeoutException(message);
+ ret.initCause(Preconditions.checkNotNull(cause));
+ return ret;
+ }
+
private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
final CompletableFuture<ShardBackendInfo> future) {
LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
.whenComplete((response, failure) -> {
if (failure != null) {
- LOG.debug("Connect attempt to {} failed", shardName, failure);
- future.completeExceptionally(failure);
+ LOG.debug("Connect attempt to {} failed, will retry", shardName, failure);
+ future.completeExceptionally(wrap("Connection attempt failed", failure));
return;
}
if (response instanceof RequestFailure) {
final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
LOG.debug("Connect attempt to {} failed to process", shardName, cause);
- future.completeExceptionally(cause);
+ final Throwable result = cause instanceof NotLeaderException
+ ? wrap("Leader moved during establishment", cause) : cause;
+ future.completeExceptionally(result);
return;
}