X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractShardBackendResolver.java;h=ca784fed7a73783e81e89812f9a06412e2197552;hb=abaef4a5ae37f27542155457fe7306a4662b1eeb;hp=6b221da76681911edf9155af13b1425f2b3d5017;hpb=bc2b83e97bc73930badd4a3063c65b849f82c664;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java index 6b221da766..ca784fed7a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java @@ -7,23 +7,28 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.util.Timeout; -import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; +import java.util.function.Consumer; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.access.commands.NotLeaderException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; @@ -31,7 +36,8 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.yangtools.concepts.Registration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Function1; @@ -40,11 +46,13 @@ import scala.compat.java8.FutureConverters; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding - * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}. + * shard leader is resolved via {@link ActorUtils}. The product of resolution is {@link ShardBackendInfo}. + * + *

+ * This class is thread-safe. * * @author Robert Varga */ -@ThreadSafe abstract class AbstractShardBackendResolver extends BackendInfoResolver { static final class ShardState { private final CompletionStage stage; @@ -52,21 +60,21 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver stage) { - this.stage = Preconditions.checkNotNull(stage); + this.stage = requireNonNull(stage); stage.whenComplete(this::onStageResolved); } - @Nonnull CompletionStage getStage() { + @NonNull CompletionStage getStage() { return stage; } - @Nullable synchronized ShardBackendInfo getResult() { + synchronized @Nullable ShardBackendInfo getResult() { return result; } - private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) { + private synchronized void onStageResolved(final ShardBackendInfo info, final Throwable failure) { if (failure == null) { - this.result = Preconditions.checkNotNull(result); + this.result = requireNonNull(info); } else { LOG.warn("Failed to resolve shard", failure); } @@ -83,24 +91,39 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver connectFunction; - private final ActorContext actorContext; + private final ActorUtils actorUtils; + private final Set> staleBackendInfoCallbacks = ConcurrentHashMap.newKeySet(); // FIXME: we really need just ActorContext.findPrimaryShardAsync() - AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { - this.actorContext = Preconditions.checkNotNull(actorContext); + AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorUtils actorUtils) { + this.actorUtils = requireNonNull(actorUtils); this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON, ABIVersion.current())); } + @Override + public Registration notifyWhenBackendInfoIsStale(final Consumer callback) { + staleBackendInfoCallbacks.add(callback); + return () -> staleBackendInfoCallbacks.remove(callback); + } + + protected void notifyStaleBackendInfoCallbacks(Long cookie) { + staleBackendInfoCallbacks.forEach(callback -> callback.accept(cookie)); + } + + protected ActorUtils actorUtils() { + return actorUtils; + } + protected final void flushCache(final String shardName) { - actorContext.getPrimaryShardInfoCache().remove(shardName); + actorUtils.getPrimaryShardInfoCache().remove(shardName); } protected final ShardState resolveBackendInfo(final String shardName, final long cookie) { LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); final CompletableFuture future = new CompletableFuture<>(); - FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).whenComplete((info, failure) -> { + FutureConverters.toJava(actorUtils.findPrimaryShardAsync(shardName)).whenComplete((info, failure) -> { if (failure == null) { connectShard(shardName, cookie, info, future); return; @@ -126,7 +149,7 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver { - if (failure != null) { - LOG.debug("Connect attempt to {} failed", shardName, failure); - future.completeExceptionally(failure); - return; - } - if (response instanceof RequestFailure) { - final Throwable cause = ((RequestFailure) response).getCause().unwrap(); - LOG.debug("Connect attempt to {} failed to process", shardName, cause); - future.completeExceptionally(cause); - return; - } - - LOG.debug("Resolved backend information to {}", response); - Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", - response); - final ConnectClientSuccess success = (ConnectClientSuccess) response; - future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(), - success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(), - success.getMaxMessages())); - }); + .whenComplete((response, failure) -> onConnectResponse(shardName, cookie, future, response, failure)); + } + + private void onConnectResponse(final String shardName, final long cookie, + final CompletableFuture future, final Object response, final Throwable failure) { + if (failure != null) { + LOG.debug("Connect attempt to {} failed, will retry", shardName, failure); + future.completeExceptionally(wrap("Connection attempt failed", failure)); + return; + } + if (response instanceof RequestFailure) { + final Throwable cause = ((RequestFailure) response).getCause().unwrap(); + LOG.debug("Connect attempt to {} failed to process", shardName, cause); + final Throwable result = cause instanceof NotLeaderException + ? wrap("Leader moved during establishment", cause) : cause; + future.completeExceptionally(result); + return; + } + + LOG.debug("Resolved backend information to {}", response); + checkArgument(response instanceof ConnectClientSuccess, "Unhandled response %s", response); + final ConnectClientSuccess success = (ConnectClientSuccess) response; + future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(), + success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(), + success.getMaxMessages())); } }