X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorBehavior.java;h=14ca1ef38c8f850071d3e1b04f520faa7a496b69;hb=refs%2Fchanges%2F05%2F83005%2F5;hp=fa2e3b76d8a038497d57efd7e344498862717d06;hpb=634dfac8eead60f443bf75e749c70d1f2bb29198;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index fa2e3b76d8..14ca1ef38c 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -7,8 +7,9 @@ */ package org.opendaylight.controller.cluster.access.client; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.base.Verify; import java.util.Collection; @@ -17,9 +18,9 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.NotLeaderException; import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; @@ -37,6 +38,7 @@ import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.concepts.Registration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; @@ -60,7 +62,7 @@ public abstract class ClientActorBehavior extends * @param enqueuedEntries Previously-enqueued entries * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns. */ - @Nonnull ReconnectForwarder finishReconnect(@Nonnull Collection enqueuedEntries); + @NonNull ReconnectForwarder finishReconnect(@NonNull Collection enqueuedEntries); } private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); @@ -83,28 +85,41 @@ public abstract class ClientActorBehavior extends private final InversibleLock connectionsLock = new InversibleLock(); private final BackendInfoResolver resolver; private final MessageAssembler responseMessageAssembler; + private final Registration staleBackendInfoReg; - protected ClientActorBehavior(@Nonnull final ClientActorContext context, - @Nonnull final BackendInfoResolver resolver) { + protected ClientActorBehavior(final @NonNull ClientActorContext context, + final @NonNull BackendInfoResolver resolver) { super(context); - this.resolver = Preconditions.checkNotNull(resolver); + this.resolver = requireNonNull(resolver); final ClientActorConfig config = context.config(); responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId()) .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(), config.getTempFileDirectory())) .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build(); + + staleBackendInfoReg = resolver.notifyWhenBackendInfoIsStale(shard -> { + context().executeInActor(behavior -> { + LOG.debug("BackendInfo for shard {} is now stale", shard); + final AbstractClientConnection conn = connections.get(shard); + if (conn instanceof ConnectedClientConnection) { + conn.reconnect(this, new BackendStaleException(shard)); + } + return behavior; + }); + }); } @Override - @Nonnull public final ClientIdentifier getIdentifier() { return context().getIdentifier(); } @Override public void close() { + super.close(); responseMessageAssembler.close(); + staleBackendInfoReg.close(); } /** @@ -260,7 +275,7 @@ public abstract class ClientActorBehavior extends * * @param cause Failure cause */ - protected abstract void haltClient(@Nonnull Throwable cause); + protected abstract void haltClient(@NonNull Throwable cause); /** * Override this method to handle any command which is not handled by the base behavior. @@ -268,15 +283,14 @@ public abstract class ClientActorBehavior extends * @param command the command to process * @return Next behavior to use, null if this actor should shut down. */ - @Nullable - protected abstract ClientActorBehavior onCommand(@Nonnull Object command); + protected abstract @Nullable ClientActorBehavior onCommand(@NonNull Object command); /** * Override this method to provide a backend resolver instance. * * @return a backend resolver instance */ - protected final @Nonnull BackendInfoResolver resolver() { + protected final @NonNull BackendInfoResolver resolver() { return resolver; } @@ -287,8 +301,8 @@ public abstract class ClientActorBehavior extends * @param newConn New connection * @return ConnectionConnectCohort which will be used to complete the process of bringing the connection up. */ - @GuardedBy("connectionsLock") - @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection newConn); + @Holding("connectionsLock") + protected abstract @NonNull ConnectionConnectCohort connectionUp(@NonNull ConnectedClientConnection newConn); private void backendConnectFinished(final Long shard, final AbstractClientConnection oldConn, final T backend, final Throwable failure) { @@ -425,7 +439,8 @@ public abstract class ClientActorBehavior extends } private ConnectingClientConnection createConnection(final Long shard) { - final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard); + final ConnectingClientConnection conn = new ConnectingClientConnection<>(context(), shard, + resolver().resolveCookieName(shard)); resolveConnection(shard, conn); return conn; } @@ -437,4 +452,17 @@ public abstract class ClientActorBehavior extends return behavior; })); } + + private static class BackendStaleException extends RequestException { + private static final long serialVersionUID = 1L; + + BackendStaleException(final Long shard) { + super("Backend for shard " + shard + " is stale"); + } + + @Override + public boolean isRetriable() { + return false; + } + } }