*/
package org.opendaylight.controller.cluster.access.client;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.Beta;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Verify;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
* @param enqueuedEntries Previously-enqueued entries
* @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns.
*/
- @Nonnull ReconnectForwarder finishReconnect(@Nonnull Collection<ConnectionEntry> enqueuedEntries);
+ @NonNull ReconnectForwarder finishReconnect(@NonNull Collection<ConnectionEntry> enqueuedEntries);
}
private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
private final InversibleLock connectionsLock = new InversibleLock();
private final BackendInfoResolver<T> resolver;
private final MessageAssembler responseMessageAssembler;
+ private final Registration staleBackendInfoReg;
- protected ClientActorBehavior(@Nonnull final ClientActorContext context,
- @Nonnull final BackendInfoResolver<T> resolver) {
+ protected ClientActorBehavior(final @NonNull ClientActorContext context,
+ final @NonNull BackendInfoResolver<T> resolver) {
super(context);
- this.resolver = Preconditions.checkNotNull(resolver);
+ this.resolver = requireNonNull(resolver);
final ClientActorConfig config = context.config();
responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId())
.fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
config.getTempFileDirectory()))
.assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
+
+ staleBackendInfoReg = resolver.notifyWhenBackendInfoIsStale(shard -> {
+ context().executeInActor(behavior -> {
+ LOG.debug("BackendInfo for shard {} is now stale", shard);
+ final AbstractClientConnection<T> conn = connections.get(shard);
+ if (conn instanceof ConnectedClientConnection) {
+ conn.reconnect(this, new BackendStaleException(shard));
+ }
+ return behavior;
+ });
+ });
}
@Override
- @Nonnull
public final ClientIdentifier getIdentifier() {
return context().getIdentifier();
}
@Override
public void close() {
+ super.close();
responseMessageAssembler.close();
+ staleBackendInfoReg.close();
}
/**
*
* @param cause Failure cause
*/
- protected abstract void haltClient(@Nonnull Throwable cause);
+ protected abstract void haltClient(@NonNull Throwable cause);
/**
* Override this method to handle any command which is not handled by the base behavior.
* @param command the command to process
* @return Next behavior to use, null if this actor should shut down.
*/
- @Nullable
- protected abstract ClientActorBehavior<T> onCommand(@Nonnull Object command);
+ protected abstract @Nullable ClientActorBehavior<T> onCommand(@NonNull Object command);
/**
* Override this method to provide a backend resolver instance.
*
* @return a backend resolver instance
*/
- protected final @Nonnull BackendInfoResolver<T> resolver() {
+ protected final @NonNull BackendInfoResolver<T> resolver() {
return resolver;
}
* @return ConnectionConnectCohort which will be used to complete the process of bringing the connection up.
*/
@GuardedBy("connectionsLock")
- @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection<T> newConn);
+ protected abstract @NonNull ConnectionConnectCohort connectionUp(@NonNull ConnectedClientConnection<T> newConn);
private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> oldConn,
final T backend, final Throwable failure) {
}
private ConnectingClientConnection<T> createConnection(final Long shard) {
- final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
+ final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard,
+ resolver().resolveCookieName(shard));
resolveConnection(shard, conn);
return conn;
}
return behavior;
}));
}
+
+ private static class BackendStaleException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ BackendStaleException(final Long shard) {
+ super("Backend for shard " + shard + " is stale");
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return false;
+ }
+ }
}