import com.google.common.base.Verify;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
}
private ClientActorBehavior<T> internalOnRequestFailure(final FailureEnvelope command) {
+ final AbstractClientConnection<T> conn = getConnection(command);
+ if (conn != null) {
+ /*
+ * We are talking to multiple actors, which may be lagging behind our state significantly. This has
+ * the effect that we may be receiving responses from a previous connection after we have created a new
+ * one to a different actor.
+ *
+ * Since we are already replaying requests to the new actor, we want to ignore errors reported on the old
+ * connection -- for example NotLeaderException, which must not cause a new reconnect. Check the envelope's
+ * sessionId and if it does not match our current connection just ignore it.
+ */
+ final Optional<T> optBackend = conn.getBackendInfo();
+ if (optBackend.isPresent() && optBackend.get().getSessionId() != command.getSessionId()) {
+ LOG.debug("{}: Mismatched current connection {} and envelope {}, ignoring response", persistenceId(),
+ conn, command);
+ return this;
+ }
+ }
+
final RequestFailure<?, ?> failure = command.getMessage();
final RequestException cause = failure.getCause();
if (cause instanceof RetiredGenerationException) {
return null;
}
if (cause instanceof NotLeaderException) {
- final AbstractClientConnection<T> conn = getConnection(command);
if (conn instanceof ReconnectingClientConnection) {
// Already reconnecting, do not churn the logs
return this;
}
}
if (cause instanceof OutOfSequenceEnvelopeException) {
- final AbstractClientConnection<T> conn = getConnection(command);
if (conn instanceof ReconnectingClientConnection) {
// Already reconnecting, do not churn the logs
return this;