*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import akka.actor.ActorRef;
-import akka.util.Timeout;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableBiMap.Builder;
-import com.google.common.primitives.UnsignedLong;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
-import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
-import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
-import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
-import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.compat.java8.FutureConverters;
/**
* {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
*
* @author Robert Varga
*/
-@SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
- justification = "Pertains to the NULL_FUTURE field below. Null is allowed and is intended")
@ThreadSafe
-final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
- private static final class Entry {
- private final CompletionStage<ShardBackendInfo> stage;
- @GuardedBy("this")
- private ShardBackendInfo result;
-
- Entry(final CompletionStage<ShardBackendInfo> stage) {
- this.stage = Preconditions.checkNotNull(stage);
- stage.whenComplete(this::onStageResolved);
- }
-
- @Nonnull CompletionStage<ShardBackendInfo> getStage() {
- return stage;
- }
-
- synchronized @Nullable ShardBackendInfo getResult() {
- return result;
- }
-
- private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) {
- if (failure == null) {
- this.result = Preconditions.checkNotNull(result);
- } else {
- LOG.warn("Failed to resolve shard", failure);
- }
- }
- }
-
- private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
+final class ModuleShardBackendResolver extends AbstractShardBackendResolver {
private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
- /**
- * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
- * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
- * non-operational.
- */
- // TODO: maybe make this configurable somehow?
- private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
-
- private final ConcurrentMap<Long, Entry> backends = new ConcurrentHashMap<>();
- private final AtomicLong nextSessionId = new AtomicLong();
- private final Function1<ActorRef, ?> connectFunction;
+ private final ConcurrentMap<Long, ShardState> backends = new ConcurrentHashMap<>();
private final ActorContext actorContext;
@GuardedBy("this")
// FIXME: we really need just ActorContext.findPrimaryShardAsync()
ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
+ super(clientId, actorContext);
this.actorContext = Preconditions.checkNotNull(actorContext);
- this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
- ABIVersion.current()));
}
Long resolveShardForPath(final YangInstanceIdentifier path) {
return cookie;
}
- private CompletionStage<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
+ private ShardState resolveBackendInfo(final Long cookie) {
final String shardName = shards.inverse().get(cookie);
if (shardName == null) {
LOG.warn("Failing request for non-existent cookie {}", cookie);
- return NULL_FUTURE;
+ return null;
}
LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
- return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
- LOG.debug("Looking up primary info for {} from {}", shardName, info);
- return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
- }).thenApply(response -> {
- if (response instanceof RequestFailure) {
- final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
- LOG.debug("Connect request failed {}", failure, failure.getCause());
- throw Throwables.propagate(failure.getCause());
- }
-
- LOG.debug("Resolved backend information to {}", response);
-
- Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
- final ConnectClientSuccess success = (ConnectClientSuccess) response;
-
- return new ShardBackendInfo(success.getBackend(),
- nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
- success.getDataTree(), success.getMaxMessages());
- });
+ return resolveBackendInfo(shardName, cookie);
}
@Override
- public CompletionStage<? extends ShardBackendInfo> getBackendInfo(final Long cookie) {
- return backends.computeIfAbsent(cookie, key -> new Entry(resolveBackendInfo(key))).getStage();
+ public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
+ return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage();
}
@Override
- public CompletionStage<? extends ShardBackendInfo> refreshBackendInfo(final Long cookie,
+ public CompletionStage<ShardBackendInfo> refreshBackendInfo(final Long cookie,
final ShardBackendInfo staleInfo) {
- final Entry existing = backends.get(cookie);
+ final ShardState existing = backends.get(cookie);
if (existing != null) {
if (!staleInfo.equals(existing.getResult())) {
return existing.getStage();
}
LOG.debug("Invalidating backend information {}", staleInfo);
- actorContext.getPrimaryShardInfoCache().remove(staleInfo.getShardName());
+ flushCache(staleInfo.getShardName());
- LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), staleInfo);
+ LOG.trace("Invalidated cache %s", staleInfo);
backends.remove(cookie, existing);
}