We have to check the session ID of the response in order not to
wreck transmit consistency if face of leader changes and reconnects.
If we reconnect the connection to the new leader before we saw all
responses from the old leader, we end up in a situation where the
old leader completes some of the replayed messages before we either
send them to the new leader or receive (the correct) reply.
Guard against this by checking the session ID before attempting to
pair a response to a request.
Change-Id: I28fa98b89c679715c3a0c546962d00533e76aa5d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
0ea09c71a5902f1ebf27ad683be634ded773e2c7)
queue.remove(now);
LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head);
queue.remove(now);
LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head);
- final double time = (beenOpen * 1.0) / 1_000_000_000;
+ final double time = beenOpen * 1.0 / 1_000_000_000;
head.complete(head.getRequest().toRequestFailure(
new RequestTimeoutException("Timed out after " + time + "seconds")));
}
head.complete(head.getRequest().toRequestFailure(
new RequestTimeoutException("Timed out after " + time + "seconds")));
}
- final void receiveResponse(final ResponseEnvelope<?> envelope) {
+ void receiveResponse(final ResponseEnvelope<?> envelope) {
final long now = currentTime();
lastReceivedTicks = now;
final long now = currentTime();
lastReceivedTicks = now;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import java.util.Optional;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
/**
* Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
* @param <T> Concrete {@link BackendInfo} type
*/
abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
* @param <T> Concrete {@link BackendInfo} type
*/
abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class);
+
/**
* Multiplication factor applied to remote's advertised limit on outstanding messages. Our default strategy
* rate-limiting strategy in {@link AveragingProgressTracker} does not penalize threads as long as we have not
/**
* Multiplication factor applied to remote's advertised limit on outstanding messages. Our default strategy
* rate-limiting strategy in {@link AveragingProgressTracker} does not penalize threads as long as we have not
return Optional.of(backend);
}
return Optional.of(backend);
}
+ @Override
+ final void receiveResponse(final ResponseEnvelope<?> envelope) {
+ if (envelope.getSessionId() != backend.getSessionId()) {
+ LOG.debug("Response {} does not match session ID {}, ignoring it", envelope, backend.getSessionId());
+ } else {
+ super.receiveResponse(envelope);
+ }
+ }
+
final T backend() {
return backend;
}
final T backend() {
return backend;
}