import com.google.common.base.Verify;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
/**
* A behavior, which handles messages sent to a {@link AbstractClientActor}.
}
private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
+ private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(5, TimeUnit.SECONDS);
/**
* Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations
private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> conn,
final T backend, final Throwable failure) {
if (failure != null) {
+ if (failure instanceof TimeoutException) {
+ if (!conn.equals(connections.get(shard))) {
+ // AbstractClientConnection will remove itself when it decides there is no point in continuing,
+ // at which point we want to stop retrying
+ LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn,
+ failure);
+ return;
+ }
+
+ LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard,
+ RESOLVE_RETRY_DURATION, failure);
+ context().executeInActor(b -> {
+ resolveConnection(shard, conn);
+ return b;
+ }, RESOLVE_RETRY_DURATION);
+ return;
+ }
+
LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure);
- conn.poison(new RuntimeRequestException("Failed to resolve shard " + shard, failure));
+ final RequestException cause;
+ if (failure instanceof RequestException) {
+ cause = (RequestException) failure;
+ } else {
+ cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure);
+ }
+
+ conn.poison(cause);
return;
}
private ConnectingClientConnection<T> createConnection(final Long shard) {
final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
+ resolveConnection(shard, conn);
+ return conn;
+ }
+ private void resolveConnection(final Long shard, final AbstractClientConnection<T> conn) {
+ LOG.debug("{}: resolving shard {} connection {}", persistenceId(), shard, conn);
resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> {
backendConnectFinished(shard, conn, backend, failure);
return behavior;
}));
-
- return conn;
}
}
import akka.actor.ActorRef;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.primitives.UnsignedLong;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(AbstractShardBackendResolver.class);
/**
- * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
- * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
- * non-operational.
+ * Connect request timeout. If the shard does not respond within this interval, we retry the lookup and connection.
*/
// TODO: maybe make this configurable somehow?
- private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
+ private static final Timeout CONNECT_TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
private final AtomicLong nextSessionId = new AtomicLong();
private final Function1<ActorRef, ?> connectFunction;
protected final ShardState resolveBackendInfo(final String shardName, final long cookie) {
LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
- return new ShardState(FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(i -> {
- LOG.debug("Looking up primary info for {} from {}", shardName, i);
- return FutureConverters.toJava(ExplicitAsk.ask(i.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
- }).thenApply(response -> {
- if (response instanceof RequestFailure) {
- final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
- LOG.debug("Connect request failed {}", failure, failure.getCause());
- throw Throwables.propagate(failure.getCause());
+ final CompletableFuture<ShardBackendInfo> future = new CompletableFuture<>();
+ FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).whenComplete((info, failure) -> {
+ if (failure == null) {
+ connectShard(shardName, cookie, info, future);
+ return;
}
- LOG.debug("Resolved backend information to {}", response);
+ LOG.debug("Shard {} failed to resolve", shardName, failure);
+ if (failure instanceof NoShardLeaderException) {
+ // FIXME: this actually is an exception we can retry on
+ future.completeExceptionally(failure);
+ } else if (failure instanceof NotInitializedException) {
+ // FIXME: this actually is an exception we can retry on
+ LOG.info("Shard {} has not initialized yet", shardName);
+ future.completeExceptionally(failure);
+ } else if (failure instanceof PrimaryNotFoundException) {
+ LOG.info("Failed to find primary for shard {}", shardName);
+ future.completeExceptionally(failure);
+ } else {
+ future.completeExceptionally(failure);
+ }
+ });
- Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
- final ConnectClientSuccess success = (ConnectClientSuccess) response;
+ return new ShardState(future);
+ }
- return new ShardBackendInfo(success.getBackend(),
- nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
- success.getDataTree(), success.getMaxMessages());
- }));
+ private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
+ final CompletableFuture<ShardBackendInfo> future) {
+ LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
+
+ FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
+ .whenComplete((response, failure) -> {
+ if (failure != null) {
+ LOG.debug("Connect attempt to {} failed", shardName, failure);
+ future.completeExceptionally(failure);
+ return;
+ }
+ if (response instanceof RequestFailure) {
+ final RequestException cause = ((RequestFailure<?, ?>) response).getCause();
+ LOG.debug("Connect attempt to {} failed to process", shardName, cause);
+ future.completeExceptionally(cause);
+ return;
+ }
+
+ LOG.debug("Resolved backend information to {}", response);
+ Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}",
+ response);
+ final ConnectClientSuccess success = (ConnectClientSuccess) response;
+ future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
+ success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
+ success.getMaxMessages()));
+ });
}
}