import akka.actor.ActorRef;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.primitives.UnsignedLong;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(AbstractShardBackendResolver.class);
/**
- * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
- * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
- * non-operational.
+ * Connect request timeout. If the shard does not respond within this interval, we retry the lookup and connection.
*/
// TODO: maybe make this configurable somehow?
- private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
+ private static final Timeout CONNECT_TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
private final AtomicLong nextSessionId = new AtomicLong();
private final Function1<ActorRef, ?> connectFunction;
protected final ShardState resolveBackendInfo(final String shardName, final long cookie) {
LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
- return new ShardState(FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(i -> {
- LOG.debug("Looking up primary info for {} from {}", shardName, i);
- return FutureConverters.toJava(ExplicitAsk.ask(i.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
- }).thenApply(response -> {
- if (response instanceof RequestFailure) {
- final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
- LOG.debug("Connect request failed {}", failure, failure.getCause());
- throw Throwables.propagate(failure.getCause());
+ final CompletableFuture<ShardBackendInfo> future = new CompletableFuture<>();
+ FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).whenComplete((info, failure) -> {
+ if (failure == null) {
+ connectShard(shardName, cookie, info, future);
+ return;
}
- LOG.debug("Resolved backend information to {}", response);
+ LOG.debug("Shard {} failed to resolve", shardName, failure);
+ if (failure instanceof NoShardLeaderException) {
+ // FIXME: this actually is an exception we can retry on
+ future.completeExceptionally(failure);
+ } else if (failure instanceof NotInitializedException) {
+ // FIXME: this actually is an exception we can retry on
+ LOG.info("Shard {} has not initialized yet", shardName);
+ future.completeExceptionally(failure);
+ } else if (failure instanceof PrimaryNotFoundException) {
+ LOG.info("Failed to find primary for shard {}", shardName);
+ future.completeExceptionally(failure);
+ } else {
+ future.completeExceptionally(failure);
+ }
+ });
- Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
- final ConnectClientSuccess success = (ConnectClientSuccess) response;
+ return new ShardState(future);
+ }
- return new ShardBackendInfo(success.getBackend(),
- nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
- success.getDataTree(), success.getMaxMessages());
- }));
+ private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
+ final CompletableFuture<ShardBackendInfo> future) {
+ LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
+
+ FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
+ .whenComplete((response, failure) -> {
+ if (failure != null) {
+ LOG.debug("Connect attempt to {} failed", shardName, failure);
+ future.completeExceptionally(failure);
+ return;
+ }
+ if (response instanceof RequestFailure) {
+ final RequestException cause = ((RequestFailure<?, ?>) response).getCause();
+ LOG.debug("Connect attempt to {} failed to process", shardName, cause);
+ future.completeExceptionally(cause);
+ return;
+ }
+
+ LOG.debug("Resolved backend information to {}", response);
+ Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}",
+ response);
+ final ConnectClientSuccess success = (ConnectClientSuccess) response;
+ future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
+ success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
+ success.getMaxMessages()));
+ });
}
}