X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=14ca1ef38c8f850071d3e1b04f520faa7a496b69;hb=refs%2Fchanges%2F05%2F83005%2F5;hp=97d312ce394a186ec4d9b4b311676218d2f21bc3;hpb=32b322afd58f120a78208c939a01422aa224d0cf;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 97d312ce39..14ca1ef38c 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -7,13 +7,22 @@ */ package org.opendaylight.controller.cluster.access.client; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.base.Verify; +import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.NotLeaderException; +import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; @@ -21,12 +30,18 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; +import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.concepts.WritableIdentifier; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.concepts.Registration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * A behavior, which handles messages sent to a {@link AbstractClientActor}. @@ -36,7 +51,22 @@ import org.slf4j.LoggerFactory; @Beta public abstract class ClientActorBehavior extends RecoveredClientActorBehavior implements Identifiable { + /** + * Connection reconnect cohort, driven by this class. + */ + @FunctionalInterface + protected interface ConnectionConnectCohort { + /** + * Finish the connection by replaying previous messages onto the new connection. + * + * @param enqueuedEntries Previously-enqueued entries + * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns. + */ + @NonNull ReconnectForwarder finishReconnect(@NonNull Collection enqueuedEntries); + } + private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); + private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(1, TimeUnit.SECONDS); /** * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations @@ -54,19 +84,44 @@ public abstract class ClientActorBehavior extends private final Map> connections = new ConcurrentHashMap<>(); private final InversibleLock connectionsLock = new InversibleLock(); private final BackendInfoResolver resolver; + private final MessageAssembler responseMessageAssembler; + private final Registration staleBackendInfoReg; - protected ClientActorBehavior(@Nonnull final ClientActorContext context, - @Nonnull final BackendInfoResolver resolver) { + protected ClientActorBehavior(final @NonNull ClientActorContext context, + final @NonNull BackendInfoResolver resolver) { super(context); - this.resolver = Preconditions.checkNotNull(resolver); + this.resolver = requireNonNull(resolver); + + final ClientActorConfig config = context.config(); + responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId()) + .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(), + config.getTempFileDirectory())) + .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build(); + + staleBackendInfoReg = resolver.notifyWhenBackendInfoIsStale(shard -> { + context().executeInActor(behavior -> { + LOG.debug("BackendInfo for shard {} is now stale", shard); + final AbstractClientConnection conn = connections.get(shard); + if (conn instanceof ConnectedClientConnection) { + conn.reconnect(this, new BackendStaleException(shard)); + } + return behavior; + }); + }); } @Override - @Nonnull public final ClientIdentifier getIdentifier() { return context().getIdentifier(); } + @Override + public void close() { + super.close(); + responseMessageAssembler.close(); + staleBackendInfoReg.close(); + } + /** * Get a connection to a shard. * @@ -85,23 +140,40 @@ public abstract class ClientActorBehavior extends } } + private AbstractClientConnection getConnection(final ResponseEnvelope response) { + // Always called from actor context: no locking required + return connections.get(extractCookie(response.getMessage().getTarget())); + } + @SuppressWarnings("unchecked") @Override final ClientActorBehavior onReceiveCommand(final Object command) { if (command instanceof InternalCommand) { return ((InternalCommand) command).execute(this); } + if (command instanceof SuccessEnvelope) { return onRequestSuccess((SuccessEnvelope) command); } + if (command instanceof FailureEnvelope) { return internalOnRequestFailure((FailureEnvelope) command); } + if (MessageAssembler.isHandledMessage(command)) { + context().dispatchers().getDispatcher(DispatcherType.Serialization).execute( + () -> responseMessageAssembler.handleMessage(command, context().self())); + return this; + } + + if (context().messageSlicer().handleMessage(command)) { + return this; + } + return onCommand(command); } - private static long extractCookie(final WritableIdentifier id) { + private static long extractCookie(final Identifier id) { if (id instanceof TransactionIdentifier) { return ((TransactionIdentifier) id).getHistoryId().getCookie(); } else if (id instanceof LocalHistoryIdentifier) { @@ -112,8 +184,7 @@ public abstract class ClientActorBehavior extends } private void onResponse(final ResponseEnvelope response) { - final long cookie = extractCookie(response.getMessage().getTarget()); - final AbstractClientConnection connection = connections.get(cookie); + final AbstractClientConnection connection = getConnection(response); if (connection != null) { connection.receiveResponse(response); } else { @@ -132,6 +203,25 @@ public abstract class ClientActorBehavior extends } private ClientActorBehavior internalOnRequestFailure(final FailureEnvelope command) { + final AbstractClientConnection conn = getConnection(command); + if (conn != null) { + /* + * We are talking to multiple actors, which may be lagging behind our state significantly. This has + * the effect that we may be receiving responses from a previous connection after we have created a new + * one to a different actor. + * + * Since we are already replaying requests to the new actor, we want to ignore errors reported on the old + * connection -- for example NotLeaderException, which must not cause a new reconnect. Check the envelope's + * sessionId and if it does not match our current connection just ignore it. + */ + final Optional optBackend = conn.getBackendInfo(); + if (optBackend.isPresent() && optBackend.get().getSessionId() != command.getSessionId()) { + LOG.debug("{}: Mismatched current connection {} and envelope {}, ignoring response", persistenceId(), + conn, command); + return this; + } + } + final RequestFailure failure = command.getMessage(); final RequestException cause = failure.getCause(); if (cause instanceof RetiredGenerationException) { @@ -140,6 +230,25 @@ public abstract class ClientActorBehavior extends poison(cause); return null; } + if (cause instanceof NotLeaderException) { + if (conn instanceof ReconnectingClientConnection) { + // Already reconnecting, do not churn the logs + return this; + } else if (conn != null) { + LOG.info("{}: connection {} indicated no leadership, reconnecting it", persistenceId(), conn, cause); + return conn.reconnect(this, cause); + } + } + if (cause instanceof OutOfSequenceEnvelopeException) { + if (conn instanceof ReconnectingClientConnection) { + // Already reconnecting, do not churn the logs + return this; + } else if (conn != null) { + LOG.info("{}: connection {} indicated sequencing mismatch on {} sequence {} ({}), reconnecting it", + persistenceId(), conn, failure.getTarget(), failure.getSequence(), command.getTxSequence(), cause); + return conn.reconnect(this, cause); + } + } return onRequestFailure(command); } @@ -155,6 +264,8 @@ public abstract class ClientActorBehavior extends } finally { connectionsLock.unlockWrite(stamp); } + + context().messageSlicer().close(); } /** @@ -164,7 +275,7 @@ public abstract class ClientActorBehavior extends * * @param cause Failure cause */ - protected abstract void haltClient(@Nonnull Throwable cause); + protected abstract void haltClient(@NonNull Throwable cause); /** * Override this method to handle any command which is not handled by the base behavior. @@ -172,62 +283,143 @@ public abstract class ClientActorBehavior extends * @param command the command to process * @return Next behavior to use, null if this actor should shut down. */ - @Nullable - protected abstract ClientActorBehavior onCommand(@Nonnull Object command); + protected abstract @Nullable ClientActorBehavior onCommand(@NonNull Object command); /** * Override this method to provide a backend resolver instance. * * @return a backend resolver instance */ - protected final @Nonnull BackendInfoResolver resolver() { + protected final @NonNull BackendInfoResolver resolver() { return resolver; } /** - * Callback invoked when a new connection has been established. + * Callback invoked when a new connection has been established. Implementations are expected perform preparatory + * tasks before the previous connection is frozen. * - * @param conn Old connection - * @param backend New backend - * @return Newly-connected connection. + * @param newConn New connection + * @return ConnectionConnectCohort which will be used to complete the process of bringing the connection up. */ - @GuardedBy("connectionsLock") - protected abstract @Nonnull ConnectedClientConnection connectionUp( - final @Nonnull AbstractClientConnection conn, final @Nonnull T backend); + @Holding("connectionsLock") + protected abstract @NonNull ConnectionConnectCohort connectionUp(@NonNull ConnectedClientConnection newConn); - private void backendConnectFinished(final Long shard, final AbstractClientConnection conn, + private void backendConnectFinished(final Long shard, final AbstractClientConnection oldConn, final T backend, final Throwable failure) { if (failure != null) { + if (failure instanceof TimeoutException) { + if (!oldConn.equals(connections.get(shard))) { + // AbstractClientConnection will remove itself when it decides there is no point in continuing, + // at which point we want to stop retrying + LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, + oldConn, failure); + return; + } + + LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard, + RESOLVE_RETRY_DURATION, failure); + context().executeInActor(b -> { + resolveConnection(shard, oldConn); + return b; + }, RESOLVE_RETRY_DURATION); + return; + } + LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure); + final RequestException cause; + if (failure instanceof RequestException) { + cause = (RequestException) failure; + } else { + cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure); + } + + oldConn.poison(cause); return; } + LOG.info("{}: resolved shard {} to {}", persistenceId(), shard, backend); final long stamp = connectionsLock.writeLock(); try { - // Bring the connection up - final ConnectedClientConnection newConn = connectionUp(conn, backend); + final Stopwatch sw = Stopwatch.createStarted(); + + // Create a new connected connection + final ConnectedClientConnection newConn = new ConnectedClientConnection<>(oldConn, backend); + LOG.info("{}: resolving connection {} to {}", persistenceId(), oldConn, newConn); + + // Start reconnecting without the old connection lock held + final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn)); + + // Lock the old connection and get a reference to its entries + final Collection replayIterable = oldConn.startReplay(); + + // Finish the connection attempt + final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable)); + + // Cancel sleep debt after entries were replayed, before new connection starts receiving. + newConn.cancelDebt(); + + // Install the forwarder, unlocking the old connection + oldConn.finishReplay(forwarder); // Make sure new lookups pick up the new connection - connections.replace(shard, conn, newConn); - LOG.debug("{}: replaced connection {} with {}", persistenceId(), conn, newConn); + if (!connections.replace(shard, oldConn, newConn)) { + final AbstractClientConnection existing = connections.get(oldConn.cookie()); + LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo", + persistenceId(), oldConn, existing, newConn); + } else { + LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), oldConn, newConn, sw); + } } finally { connectionsLock.unlockWrite(stamp); } } void removeConnection(final AbstractClientConnection conn) { - connections.remove(conn.cookie(), conn); - LOG.debug("{}: removed connection {}", persistenceId(), conn); + final long stamp = connectionsLock.writeLock(); + try { + if (!connections.remove(conn.cookie(), conn)) { + final AbstractClientConnection existing = connections.get(conn.cookie()); + if (existing != null) { + LOG.warn("{}: failed to remove connection {}, as it was superseded by {}", persistenceId(), conn, + existing); + } else { + LOG.warn("{}: failed to remove connection {}, as it was not tracked", persistenceId(), conn); + } + } else { + LOG.info("{}: removed connection {}", persistenceId(), conn); + cancelSlicing(conn.cookie()); + } + } finally { + connectionsLock.unlockWrite(stamp); + } } @SuppressWarnings("unchecked") void reconnectConnection(final ConnectedClientConnection oldConn, final ReconnectingClientConnection newConn) { final ReconnectingClientConnection conn = (ReconnectingClientConnection)newConn; - connections.replace(oldConn.cookie(), (AbstractClientConnection)oldConn, conn); - LOG.debug("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn); + LOG.info("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn); + + final long stamp = connectionsLock.writeLock(); + try { + final boolean replaced = connections.replace(oldConn.cookie(), (AbstractClientConnection)oldConn, conn); + if (!replaced) { + final AbstractClientConnection existing = connections.get(oldConn.cookie()); + if (existing != null) { + LOG.warn("{}: failed to replace connection {}, as it was superseded by {}", persistenceId(), conn, + existing); + } else { + LOG.warn("{}: failed to replace connection {}, as it was not tracked", persistenceId(), conn); + } + } else { + cancelSlicing(oldConn.cookie()); + } + } finally { + connectionsLock.unlockWrite(stamp); + } final Long shard = oldConn.cookie(); + LOG.info("{}: refreshing backend for shard {}", persistenceId(), shard); resolver().refreshBackendInfo(shard, conn.getBackendInfo().get()).whenComplete( (backend, failure) -> context().executeInActor(behavior -> { backendConnectFinished(shard, conn, backend, failure); @@ -235,14 +427,42 @@ public abstract class ClientActorBehavior extends })); } + private void cancelSlicing(final Long cookie) { + context().messageSlicer().cancelSlicing(id -> { + try { + return cookie.equals(extractCookie(id)); + } catch (IllegalArgumentException e) { + LOG.debug("extractCookie failed while cancelling slicing for cookie {}", cookie, e); + return false; + } + }); + } + private ConnectingClientConnection createConnection(final Long shard) { - final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard); + final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard, + resolver().resolveCookieName(shard)); + resolveConnection(shard, conn); + return conn; + } + private void resolveConnection(final Long shard, final AbstractClientConnection conn) { + LOG.debug("{}: resolving shard {} connection {}", persistenceId(), shard, conn); resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> { backendConnectFinished(shard, conn, backend, failure); return behavior; })); + } - return conn; + private static class BackendStaleException extends RequestException { + private static final long serialVersionUID = 1L; + + BackendStaleException(final Long shard) { + super("Backend for shard " + shard + " is stale"); + } + + @Override + public boolean isRetriable() { + return false; + } } }