import com.google.common.base.Verify;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
}
private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
- private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(5, TimeUnit.SECONDS);
+ private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(1, TimeUnit.SECONDS);
/**
* Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations
}
private ClientActorBehavior<T> internalOnRequestFailure(final FailureEnvelope command) {
+ final AbstractClientConnection<T> conn = getConnection(command);
+ if (conn != null) {
+ /*
+ * We are talking to multiple actors, which may be lagging behind our state significantly. This has
+ * the effect that we may be receiving responses from a previous connection after we have created a new
+ * one to a different actor.
+ *
+ * Since we are already replaying requests to the new actor, we want to ignore errors reported on the old
+ * connection -- for example NotLeaderException, which must not cause a new reconnect. Check the envelope's
+ * sessionId and if it does not match our current connection just ignore it.
+ */
+ final Optional<T> optBackend = conn.getBackendInfo();
+ if (optBackend.isPresent() && optBackend.get().getSessionId() != command.getSessionId()) {
+ LOG.debug("{}: Mismatched current connection {} and envelope {}, ignoring response", persistenceId(),
+ conn, command);
+ return this;
+ }
+ }
+
final RequestFailure<?, ?> failure = command.getMessage();
final RequestException cause = failure.getCause();
if (cause instanceof RetiredGenerationException) {
return null;
}
if (cause instanceof NotLeaderException) {
- final AbstractClientConnection<T> conn = getConnection(command);
if (conn instanceof ReconnectingClientConnection) {
// Already reconnecting, do not churn the logs
return this;
}
}
if (cause instanceof OutOfSequenceEnvelopeException) {
- final AbstractClientConnection<T> conn = getConnection(command);
if (conn instanceof ReconnectingClientConnection) {
// Already reconnecting, do not churn the logs
return this;
} else if (conn != null) {
- LOG.info("{}: connection {} indicated no sequencing mismatch on {} sequence {}, reconnecting it",
- persistenceId(), conn, failure.getTarget(), failure.getSequence(), cause);
+ LOG.info("{}: connection {} indicated sequencing mismatch on {} sequence {} ({}), reconnecting it",
+ persistenceId(), conn, failure.getTarget(), failure.getSequence(), command.getTxSequence(), cause);
return conn.reconnect(this, cause);
}
}