X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=ccfbba6cd3d94f92ef9757999352893e1b0f7270;hp=896d85b713fcecfb611cc0d23987a973125718d1;hb=417f3a8c2401e6c8ce3c91ea969da24929c24c1c;hpb=9409f87fa5f6ea0a37384a85bb4e66b974fdd9a7 diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 896d85b713..ccfbba6cd3 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -9,14 +9,19 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.base.Verify; +import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.access.commands.NotLeaderException; +import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; @@ -52,7 +57,7 @@ public abstract class ClientActorBehavior extends * @param enqueuedEntries Previously-enqueued entries * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns. */ - @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable enqueuedEntries); + @Nonnull ReconnectForwarder finishReconnect(@Nonnull Collection enqueuedEntries); } private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); @@ -105,6 +110,11 @@ public abstract class ClientActorBehavior extends } } + private AbstractClientConnection getConnection(final ResponseEnvelope response) { + // Always called from actor context: no locking required + return connections.get(extractCookie(response.getMessage().getTarget())); + } + @SuppressWarnings("unchecked") @Override final ClientActorBehavior onReceiveCommand(final Object command) { @@ -132,8 +142,7 @@ public abstract class ClientActorBehavior extends } private void onResponse(final ResponseEnvelope response) { - final long cookie = extractCookie(response.getMessage().getTarget()); - final AbstractClientConnection connection = connections.get(cookie); + final AbstractClientConnection connection = getConnection(response); if (connection != null) { connection.receiveResponse(response); } else { @@ -152,6 +161,25 @@ public abstract class ClientActorBehavior extends } private ClientActorBehavior internalOnRequestFailure(final FailureEnvelope command) { + final AbstractClientConnection conn = getConnection(command); + if (conn != null) { + /* + * We are talking to multiple actors, which may be lagging behind our state significantly. This has + * the effect that we may be receiving responses from a previous connection after we have created a new + * one to a different actor. + * + * Since we are already replaying requests to the new actor, we want to ignore errors reported on the old + * connection -- for example NotLeaderException, which must not cause a new reconnect. Check the envelope's + * sessionId and if it does not match our current connection just ignore it. + */ + final Optional optBackend = conn.getBackendInfo(); + if (optBackend.isPresent() && optBackend.get().getSessionId() != command.getSessionId()) { + LOG.debug("{}: Mismatched current connection {} and envelope {}, ignoring response", persistenceId(), + conn, command); + return this; + } + } + final RequestFailure failure = command.getMessage(); final RequestException cause = failure.getCause(); if (cause instanceof RetiredGenerationException) { @@ -160,6 +188,25 @@ public abstract class ClientActorBehavior extends poison(cause); return null; } + if (cause instanceof NotLeaderException) { + if (conn instanceof ReconnectingClientConnection) { + // Already reconnecting, do not churn the logs + return this; + } else if (conn != null) { + LOG.info("{}: connection {} indicated no leadership, reconnecting it", persistenceId(), conn, cause); + return conn.reconnect(this, cause); + } + } + if (cause instanceof OutOfSequenceEnvelopeException) { + if (conn instanceof ReconnectingClientConnection) { + // Already reconnecting, do not churn the logs + return this; + } else if (conn != null) { + LOG.info("{}: connection {} indicated no sequencing mismatch on {} sequence {}, reconnecting it", + persistenceId(), conn, failure.getTarget(), failure.getSequence(), cause); + return conn.reconnect(this, cause); + } + } return onRequestFailure(command); } @@ -247,19 +294,21 @@ public abstract class ClientActorBehavior extends return; } - LOG.debug("{}: resolved shard {} to {}", persistenceId(), shard, backend); + LOG.info("{}: resolved shard {} to {}", persistenceId(), shard, backend); final long stamp = connectionsLock.writeLock(); try { + final Stopwatch sw = Stopwatch.createStarted(); + // Create a new connected connection final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), conn.cookie(), backend); - LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + LOG.info("{}: resolving connection {} to {}", persistenceId(), conn, newConn); // Start reconnecting without the old connection lock held final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn)); // Lock the old connection and get a reference to its entries - final Iterable replayIterable = conn.startReplay(); + final Collection replayIterable = conn.startReplay(); // Finish the connection attempt final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable)); @@ -268,26 +317,61 @@ public abstract class ClientActorBehavior extends conn.finishReplay(forwarder); // Make sure new lookups pick up the new connection - connections.replace(shard, conn, newConn); - LOG.debug("{}: replaced connection {} with {}", persistenceId(), conn, newConn); + if (!connections.replace(shard, conn, newConn)) { + final AbstractClientConnection existing = connections.get(conn.cookie()); + LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo", + persistenceId(), conn, existing, newConn); + } else { + LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), conn, newConn, sw); + } } finally { connectionsLock.unlockWrite(stamp); } } void removeConnection(final AbstractClientConnection conn) { - connections.remove(conn.cookie(), conn); - LOG.debug("{}: removed connection {}", persistenceId(), conn); + final long stamp = connectionsLock.writeLock(); + try { + if (!connections.remove(conn.cookie(), conn)) { + final AbstractClientConnection existing = connections.get(conn.cookie()); + if (existing != null) { + LOG.warn("{}: failed to remove connection {}, as it was superseded by {}", persistenceId(), conn, + existing); + } else { + LOG.warn("{}: failed to remove connection {}, as it was not tracked", persistenceId(), conn); + } + } else { + LOG.info("{}: removed connection {}", persistenceId(), conn); + } + } finally { + connectionsLock.unlockWrite(stamp); + } } @SuppressWarnings("unchecked") void reconnectConnection(final ConnectedClientConnection oldConn, final ReconnectingClientConnection newConn) { final ReconnectingClientConnection conn = (ReconnectingClientConnection)newConn; - connections.replace(oldConn.cookie(), (AbstractClientConnection)oldConn, conn); - LOG.debug("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn); + LOG.info("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn); + + final long stamp = connectionsLock.writeLock(); + try { + final boolean replaced = connections.replace(oldConn.cookie(), (AbstractClientConnection)oldConn, conn); + if (!replaced) { + final AbstractClientConnection existing = connections.get(oldConn.cookie()); + if (existing != null) { + LOG.warn("{}: failed to replace connection {}, as it was superseded by {}", persistenceId(), conn, + existing); + } else { + LOG.warn("{}: failed to replace connection {}, as it was not tracked", persistenceId(), conn); + } + } + } finally { + connectionsLock.unlockWrite(stamp); + } final Long shard = oldConn.cookie(); + LOG.info("{}: refreshing backend for shard {}", persistenceId(), shard); resolver().refreshBackendInfo(shard, conn.getBackendInfo().get()).whenComplete( (backend, failure) -> context().executeInActor(behavior -> { backendConnectFinished(shard, conn, backend, failure);