X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=6856c033220822ccb48d1ad456ae0b0902462f34;hb=d0284b051af618b3d68ce95e0084ae391a7f2dbc;hp=358704524a07451da1e72c26bec9192f2fd71e06;hpb=b99dc64f4c2373e28c3c94c11cedad0e5f7abe1d;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 358704524a..6856c03322 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -7,8 +7,8 @@ */ package org.opendaylight.controller.cluster.access.client; -import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; +import static java.util.Objects.requireNonNull; + import com.google.common.base.Stopwatch; import com.google.common.base.Verify; import java.util.Collection; @@ -17,9 +17,9 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.NotLeaderException; import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; @@ -36,17 +36,15 @@ import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherTy import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.concepts.WritableIdentifier; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.concepts.Registration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; /** * A behavior, which handles messages sent to a {@link AbstractClientActor}. - * - * @author Robert Varga */ -@Beta public abstract class ClientActorBehavior extends RecoveredClientActorBehavior implements Identifiable { /** @@ -60,11 +58,11 @@ public abstract class ClientActorBehavior extends * @param enqueuedEntries Previously-enqueued entries * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns. */ - @Nonnull ReconnectForwarder finishReconnect(@Nonnull Collection enqueuedEntries); + @NonNull ReconnectForwarder finishReconnect(@NonNull Collection enqueuedEntries); } private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); - private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(5, TimeUnit.SECONDS); + private static final FiniteDuration RESOLVE_RETRY_DURATION = FiniteDuration.apply(1, TimeUnit.SECONDS); /** * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations @@ -83,28 +81,41 @@ public abstract class ClientActorBehavior extends private final InversibleLock connectionsLock = new InversibleLock(); private final BackendInfoResolver resolver; private final MessageAssembler responseMessageAssembler; + private final Registration staleBackendInfoReg; - protected ClientActorBehavior(@Nonnull final ClientActorContext context, - @Nonnull final BackendInfoResolver resolver) { + protected ClientActorBehavior(final @NonNull ClientActorContext context, + final @NonNull BackendInfoResolver resolver) { super(context); - this.resolver = Preconditions.checkNotNull(resolver); + this.resolver = requireNonNull(resolver); final ClientActorConfig config = context.config(); responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId()) .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(), config.getTempFileDirectory())) .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build(); + + staleBackendInfoReg = resolver.notifyWhenBackendInfoIsStale(shard -> { + context().executeInActor(behavior -> { + LOG.debug("BackendInfo for shard {} is now stale", shard); + final AbstractClientConnection conn = connections.get(shard); + if (conn instanceof ConnectedClientConnection) { + conn.reconnect(this, new BackendStaleException(shard)); + } + return behavior; + }); + }); } @Override - @Nonnull public final ClientIdentifier getIdentifier() { return context().getIdentifier(); } @Override public void close() { + super.close(); responseMessageAssembler.close(); + staleBackendInfoReg.close(); } /** @@ -137,12 +148,11 @@ public abstract class ClientActorBehavior extends return ((InternalCommand) command).execute(this); } - if (command instanceof SuccessEnvelope) { - return onRequestSuccess((SuccessEnvelope) command); + if (command instanceof SuccessEnvelope successEnvelope) { + return onRequestSuccess(successEnvelope); } - - if (command instanceof FailureEnvelope) { - return internalOnRequestFailure((FailureEnvelope) command); + if (command instanceof FailureEnvelope failureEnvelope) { + return internalOnRequestFailure(failureEnvelope); } if (MessageAssembler.isHandledMessage(command)) { @@ -151,14 +161,18 @@ public abstract class ClientActorBehavior extends return this; } + if (context().messageSlicer().handleMessage(command)) { + return this; + } + return onCommand(command); } - private static long extractCookie(final WritableIdentifier id) { - if (id instanceof TransactionIdentifier) { - return ((TransactionIdentifier) id).getHistoryId().getCookie(); - } else if (id instanceof LocalHistoryIdentifier) { - return ((LocalHistoryIdentifier) id).getCookie(); + private static long extractCookie(final Identifier id) { + if (id instanceof TransactionIdentifier transactionId) { + return transactionId.getHistoryId().getCookie(); + } else if (id instanceof LocalHistoryIdentifier historyId) { + return historyId.getCookie(); } else { throw new IllegalArgumentException("Unhandled identifier " + id); } @@ -245,6 +259,8 @@ public abstract class ClientActorBehavior extends } finally { connectionsLock.unlockWrite(stamp); } + + context().messageSlicer().close(); } /** @@ -254,7 +270,7 @@ public abstract class ClientActorBehavior extends * * @param cause Failure cause */ - protected abstract void haltClient(@Nonnull Throwable cause); + protected abstract void haltClient(@NonNull Throwable cause); /** * Override this method to handle any command which is not handled by the base behavior. @@ -262,15 +278,14 @@ public abstract class ClientActorBehavior extends * @param command the command to process * @return Next behavior to use, null if this actor should shut down. */ - @Nullable - protected abstract ClientActorBehavior onCommand(@Nonnull Object command); + protected abstract @Nullable ClientActorBehavior onCommand(@NonNull Object command); /** * Override this method to provide a backend resolver instance. * * @return a backend resolver instance */ - protected final @Nonnull BackendInfoResolver resolver() { + protected final @NonNull BackendInfoResolver resolver() { return resolver; } @@ -281,25 +296,25 @@ public abstract class ClientActorBehavior extends * @param newConn New connection * @return ConnectionConnectCohort which will be used to complete the process of bringing the connection up. */ - @GuardedBy("connectionsLock") - @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection newConn); + @Holding("connectionsLock") + protected abstract @NonNull ConnectionConnectCohort connectionUp(@NonNull ConnectedClientConnection newConn); - private void backendConnectFinished(final Long shard, final AbstractClientConnection conn, + private void backendConnectFinished(final Long shard, final AbstractClientConnection oldConn, final T backend, final Throwable failure) { if (failure != null) { if (failure instanceof TimeoutException) { - if (!conn.equals(connections.get(shard))) { + if (!oldConn.equals(connections.get(shard))) { // AbstractClientConnection will remove itself when it decides there is no point in continuing, // at which point we want to stop retrying - LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, conn, - failure); + LOG.info("{}: stopping resolution of shard {} on stale connection {}", persistenceId(), shard, + oldConn, failure); return; } LOG.debug("{}: timed out resolving shard {}, scheduling retry in {}", persistenceId(), shard, RESOLVE_RETRY_DURATION, failure); context().executeInActor(b -> { - resolveConnection(shard, conn); + resolveConnection(shard, oldConn); return b; }, RESOLVE_RETRY_DURATION); return; @@ -307,13 +322,13 @@ public abstract class ClientActorBehavior extends LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure); final RequestException cause; - if (failure instanceof RequestException) { - cause = (RequestException) failure; + if (failure instanceof RequestException requestException) { + cause = requestException; } else { cause = new RuntimeRequestException("Failed to resolve shard " + shard, failure); } - conn.poison(cause); + oldConn.poison(cause); return; } @@ -323,29 +338,31 @@ public abstract class ClientActorBehavior extends final Stopwatch sw = Stopwatch.createStarted(); // Create a new connected connection - final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), - conn.cookie(), backend); - LOG.info("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + final ConnectedClientConnection newConn = new ConnectedClientConnection<>(oldConn, backend); + LOG.info("{}: resolving connection {} to {}", persistenceId(), oldConn, newConn); // Start reconnecting without the old connection lock held final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn)); // Lock the old connection and get a reference to its entries - final Collection replayIterable = conn.startReplay(); + final Collection replayIterable = oldConn.startReplay(); // Finish the connection attempt final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable)); + // Cancel sleep debt after entries were replayed, before new connection starts receiving. + newConn.cancelDebt(); + // Install the forwarder, unlocking the old connection - conn.finishReplay(forwarder); + oldConn.finishReplay(forwarder); // Make sure new lookups pick up the new connection - if (!connections.replace(shard, conn, newConn)) { - final AbstractClientConnection existing = connections.get(conn.cookie()); + if (!connections.replace(shard, oldConn, newConn)) { + final AbstractClientConnection existing = connections.get(oldConn.cookie()); LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo", - persistenceId(), conn, existing, newConn); + persistenceId(), oldConn, existing, newConn); } else { - LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), conn, newConn, sw); + LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), oldConn, newConn, sw); } } finally { connectionsLock.unlockWrite(stamp); @@ -365,6 +382,7 @@ public abstract class ClientActorBehavior extends } } else { LOG.info("{}: removed connection {}", persistenceId(), conn); + cancelSlicing(conn.cookie()); } } finally { connectionsLock.unlockWrite(stamp); @@ -388,6 +406,8 @@ public abstract class ClientActorBehavior extends } else { LOG.warn("{}: failed to replace connection {}, as it was not tracked", persistenceId(), conn); } + } else { + cancelSlicing(oldConn.cookie()); } } finally { connectionsLock.unlockWrite(stamp); @@ -402,8 +422,20 @@ public abstract class ClientActorBehavior extends })); } + private void cancelSlicing(final Long cookie) { + context().messageSlicer().cancelSlicing(id -> { + try { + return cookie.equals(extractCookie(id)); + } catch (IllegalArgumentException e) { + LOG.debug("extractCookie failed while cancelling slicing for cookie {}", cookie, e); + return false; + } + }); + } + private ConnectingClientConnection createConnection(final Long shard) { - final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard); + final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard, + resolver().resolveCookieName(shard)); resolveConnection(shard, conn); return conn; } @@ -415,4 +447,17 @@ public abstract class ClientActorBehavior extends return behavior; })); } + + private static class BackendStaleException extends RequestException { + private static final long serialVersionUID = 1L; + + BackendStaleException(final Long shard) { + super("Backend for shard " + shard + " is stale"); + } + + @Override + public boolean isRetriable() { + return false; + } + } }