X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=627c1df757328f2d759cb8e69a7d9007c371e0ea;hb=80e6514d56cd4dc6aa40997dea2b460723148341;hp=9ba118ed6b9c95a1d5c914a4b681413af4cce95d;hpb=90a377ec4fe7c1aa4df6d4fde74bfc8189e95b08;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 9ba118ed6b..627c1df757 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -9,9 +9,11 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.base.Verify; import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -59,7 +61,7 @@ public abstract class ClientActorBehavior extends } private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); - private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(5, TimeUnit.SECONDS); + private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(1, TimeUnit.SECONDS); /** * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations @@ -159,6 +161,25 @@ public abstract class ClientActorBehavior extends } private ClientActorBehavior internalOnRequestFailure(final FailureEnvelope command) { + final AbstractClientConnection conn = getConnection(command); + if (conn != null) { + /* + * We are talking to multiple actors, which may be lagging behind our state significantly. This has + * the effect that we may be receiving responses from a previous connection after we have created a new + * one to a different actor. + * + * Since we are already replaying requests to the new actor, we want to ignore errors reported on the old + * connection -- for example NotLeaderException, which must not cause a new reconnect. Check the envelope's + * sessionId and if it does not match our current connection just ignore it. + */ + final Optional optBackend = conn.getBackendInfo(); + if (optBackend.isPresent() && optBackend.get().getSessionId() != command.getSessionId()) { + LOG.debug("{}: Mismatched current connection {} and envelope {}, ignoring response", persistenceId(), + conn, command); + return this; + } + } + final RequestFailure failure = command.getMessage(); final RequestException cause = failure.getCause(); if (cause instanceof RetiredGenerationException) { @@ -168,7 +189,6 @@ public abstract class ClientActorBehavior extends return null; } if (cause instanceof NotLeaderException) { - final AbstractClientConnection conn = getConnection(command); if (conn instanceof ReconnectingClientConnection) { // Already reconnecting, do not churn the logs return this; @@ -178,13 +198,12 @@ public abstract class ClientActorBehavior extends } } if (cause instanceof OutOfSequenceEnvelopeException) { - final AbstractClientConnection conn = getConnection(command); if (conn instanceof ReconnectingClientConnection) { // Already reconnecting, do not churn the logs return this; } else if (conn != null) { - LOG.info("{}: connection {} indicated no sequencing mismatch on {} sequence {}, reconnecting it", - persistenceId(), conn, failure.getTarget(), failure.getSequence(), cause); + LOG.info("{}: connection {} indicated sequencing mismatch on {} sequence {} ({}), reconnecting it", + persistenceId(), conn, failure.getTarget(), failure.getSequence(), command.getTxSequence(), cause); return conn.reconnect(this, cause); } } @@ -278,6 +297,8 @@ public abstract class ClientActorBehavior extends LOG.info("{}: resolved shard {} to {}", persistenceId(), shard, backend); final long stamp = connectionsLock.writeLock(); try { + final Stopwatch sw = Stopwatch.createStarted(); + // Create a new connected connection final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), conn.cookie(), backend); @@ -301,7 +322,7 @@ public abstract class ClientActorBehavior extends LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo", persistenceId(), conn, existing, newConn); } else { - LOG.info("{}: replaced connection {} with {}", persistenceId(), conn, newConn); + LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), conn, newConn, sw); } } finally { connectionsLock.unlockWrite(stamp);