X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractShardBackendResolver.java;h=d2b10c1b7067a038e90d910b16a0749ff4b76212;hb=db3d7caeeb310f76a9a159f9a8d7e9beff89f645;hp=51f96e18fde4e2f0ea66d2ca41d3f72d9ea58755;hpb=9409f87fa5f6ea0a37384a85bb4e66b974fdd9a7;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java index 51f96e18fd..d2b10c1b70 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java @@ -11,10 +11,14 @@ import akka.actor.ActorRef; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -23,8 +27,8 @@ import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.access.commands.NotLeaderException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; -import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; @@ -32,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.concepts.Registration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Function1; @@ -64,9 +69,9 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver connectFunction; private final ActorContext actorContext; + private final Set> staleBackendInfoCallbacks = ConcurrentHashMap.newKeySet(); // FIXME: we really need just ActorContext.findPrimaryShardAsync() AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { @@ -92,6 +98,20 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver callback) { + staleBackendInfoCallbacks.add(callback); + return () -> staleBackendInfoCallbacks.remove(callback); + } + + protected void notifyStaleBackendInfoCallbacks(Long cookie) { + staleBackendInfoCallbacks.forEach(callback -> callback.accept(cookie)); + } + + protected ActorContext actorContext() { + return actorContext; + } + protected final void flushCache(final String shardName) { actorContext.getPrimaryShardInfoCache().remove(shardName); } @@ -108,8 +128,7 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver future) { LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info); FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT)) - .whenComplete((response, failure) -> { - if (failure != null) { - LOG.debug("Connect attempt to {} failed", shardName, failure); - future.completeExceptionally(failure); - return; - } - if (response instanceof RequestFailure) { - final RequestException cause = ((RequestFailure) response).getCause(); - LOG.debug("Connect attempt to {} failed to process", shardName, cause); - future.completeExceptionally(cause); - return; - } - - LOG.debug("Resolved backend information to {}", response); - Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", - response); - final ConnectClientSuccess success = (ConnectClientSuccess) response; - future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(), - success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(), - success.getMaxMessages())); - }); + .whenComplete((response, failure) -> onConnectResponse(shardName, cookie, future, response, failure)); + } + + private void onConnectResponse(final String shardName, final long cookie, + final CompletableFuture future, final Object response, final Throwable failure) { + if (failure != null) { + LOG.debug("Connect attempt to {} failed, will retry", shardName, failure); + future.completeExceptionally(wrap("Connection attempt failed", failure)); + return; + } + if (response instanceof RequestFailure) { + final Throwable cause = ((RequestFailure) response).getCause().unwrap(); + LOG.debug("Connect attempt to {} failed to process", shardName, cause); + final Throwable result = cause instanceof NotLeaderException + ? wrap("Leader moved during establishment", cause) : cause; + future.completeExceptionally(result); + return; + } + + LOG.debug("Resolved backend information to {}", response); + Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response %s", + response); + final ConnectClientSuccess success = (ConnectClientSuccess) response; + future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(), + success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(), + success.getMaxMessages())); } }