BUG-8445: ignore responses from mismatched sessions 16/59616/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 28 Jun 2017 08:11:47 +0000 (10:11 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Wed, 28 Jun 2017 12:31:59 +0000 (12:31 +0000)
We have to check the session ID of the response in order not to
wreck transmit consistency if face of leader changes and reconnects.

If we reconnect the connection to the new leader before we saw all
responses from the old leader, we end up in a situation where the
old leader completes some of the replayed messages before we either
send them to the new leader or receive (the correct) reply.

Guard against this by checking the session ID before attempting to
pair a response to a request.

Change-Id: I28fa98b89c679715c3a0c546962d00533e76aa5d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 0ea09c71a5902f1ebf27ad683be634ded773e2c7)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java

index 380fdeb862bdb8fbb64b79cae0594803dd7fde7c..b55e02213ea402f61619d82c7d7fa7708043d7f0 100644 (file)
@@ -368,7 +368,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             queue.remove(now);
             LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head);
 
-            final double time = (beenOpen * 1.0) / 1_000_000_000;
+            final double time = beenOpen * 1.0 / 1_000_000_000;
             head.complete(head.getRequest().toRequestFailure(
                 new RequestTimeoutException("Timed out after " + time + "seconds")));
         }
@@ -405,7 +405,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         return poisoned;
     }
 
-    final void receiveResponse(final ResponseEnvelope<?> envelope) {
+    void receiveResponse(final ResponseEnvelope<?> envelope) {
         final long now = currentTime();
         lastReceivedTicks = now;
 
index a27470db8959455182c8705699464e142ff2315d..c87556ce986830418a2ee02d1b9c3509f7086c78 100644 (file)
@@ -10,6 +10,9 @@ package org.opendaylight.controller.cluster.access.client;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
@@ -20,6 +23,8 @@ import java.util.Optional;
  * @param <T> Concrete {@link BackendInfo} type
  */
 abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class);
+
     /**
      * Multiplication factor applied to remote's advertised limit on outstanding messages. Our default strategy
      * rate-limiting strategy in {@link AveragingProgressTracker} does not penalize threads as long as we have not
@@ -54,6 +59,15 @@ abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends
         return Optional.of(backend);
     }
 
+    @Override
+    final void receiveResponse(final ResponseEnvelope<?> envelope) {
+        if (envelope.getSessionId() != backend.getSessionId()) {
+            LOG.debug("Response {} does not match session ID {}, ignoring it", envelope, backend.getSessionId());
+        } else {
+            super.receiveResponse(envelope);
+        }
+    }
+
     final T backend() {
         return backend;
     }