import com.google.common.base.Verify;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
/**
* A behavior, which handles messages sent to a {@link AbstractClientActor}.
}
private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
+ private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(5, TimeUnit.SECONDS);
/**
* Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations
private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> conn,
final T backend, final Throwable failure) {
if (failure != null) {
+ if (failure instanceof TimeoutException) {
+ if (!conn.equals(connections.get(shard))) {
+ // AbstractClientConnection will remove itself when it decides there is no point in continuing,
+ // at which point we want to stop retrying
+ LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn,
+ failure);
+ return;
+ }
+
+ LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard,
+ RESOLVE_RETRY_DURATION, failure);
+ context().executeInActor(b -> {
+ resolveConnection(shard, conn);
+ return b;
+ }, RESOLVE_RETRY_DURATION);
+ return;
+ }
+
LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure);
- conn.poison(new RuntimeRequestException("Failed to resolve shard " + shard, failure));
+ final RequestException cause;
+ if (failure instanceof RequestException) {
+ cause = (RequestException) failure;
+ } else {
+ cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure);
+ }
+
+ conn.poison(cause);
return;
}
private ConnectingClientConnection<T> createConnection(final Long shard) {
final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
+ resolveConnection(shard, conn);
+ return conn;
+ }
+ private void resolveConnection(final Long shard, final AbstractClientConnection<T> conn) {
+ LOG.debug("{}: resolving shard {} connection {}", persistenceId(), shard, conn);
resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> {
backendConnectFinished(shard, conn, backend, failure);
return behavior;
}));
-
- return conn;
}
}