From: Robert Varga Date: Wed, 28 Jun 2017 08:11:47 +0000 (+0200) Subject: BUG-8445: ignore responses from mismatched sessions X-Git-Tag: release/nitrogen~80 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=aabd940f4eb88df7cc4ba1a6b6a061ffb864960c BUG-8445: ignore responses from mismatched sessions We have to check the session ID of the response in order not to wreck transmit consistency if face of leader changes and reconnects. If we reconnect the connection to the new leader before we saw all responses from the old leader, we end up in a situation where the old leader completes some of the replayed messages before we either send them to the new leader or receive (the correct) reply. Guard against this by checking the session ID before attempting to pair a response to a request. Change-Id: I28fa98b89c679715c3a0c546962d00533e76aa5d Signed-off-by: Robert Varga (cherry picked from commit 0ea09c71a5902f1ebf27ad683be634ded773e2c7) --- diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 380fdeb862..b55e02213e 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -368,7 +368,7 @@ public abstract class AbstractClientConnection { queue.remove(now); LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head); - final double time = (beenOpen * 1.0) / 1_000_000_000; + final double time = beenOpen * 1.0 / 1_000_000_000; head.complete(head.getRequest().toRequestFailure( new RequestTimeoutException("Timed out after " + time + "seconds"))); } @@ -405,7 +405,7 @@ public abstract class AbstractClientConnection { return poisoned; } - final void receiveResponse(final ResponseEnvelope envelope) { + void receiveResponse(final ResponseEnvelope envelope) { final long now = currentTime(); lastReceivedTicks = now; diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java index a27470db89..c87556ce98 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java @@ -10,6 +10,9 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Preconditions; import java.util.Optional; +import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its @@ -20,6 +23,8 @@ import java.util.Optional; * @param Concrete {@link BackendInfo} type */ abstract class AbstractReceivingClientConnection extends AbstractClientConnection { + private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class); + /** * Multiplication factor applied to remote's advertised limit on outstanding messages. Our default strategy * rate-limiting strategy in {@link AveragingProgressTracker} does not penalize threads as long as we have not @@ -54,6 +59,15 @@ abstract class AbstractReceivingClientConnection extends return Optional.of(backend); } + @Override + final void receiveResponse(final ResponseEnvelope envelope) { + if (envelope.getSessionId() != backend.getSessionId()) { + LOG.debug("Response {} does not match session ID {}, ignoring it", envelope, backend.getSessionId()); + } else { + super.receiveResponse(envelope); + } + } + final T backend() { return backend; }