import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.controller.cluster.access.ABIVersion;
-import org.opendaylight.controller.cluster.access.client.BackendInfo;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
*/
@SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
justification = "Pertains to the NULL_FUTURE field below. Null is allowed and is intended")
+@ThreadSafe
final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
+ private static final class Entry {
+ private final CompletionStage<ShardBackendInfo> stage;
+ @GuardedBy("this")
+ private ShardBackendInfo result;
+
+ Entry(final CompletionStage<ShardBackendInfo> stage) {
+ this.stage = Preconditions.checkNotNull(stage);
+ stage.whenComplete(this::onStageResolved);
+ }
+
+ @Nonnull CompletionStage<ShardBackendInfo> getStage() {
+ return stage;
+ }
+
+ synchronized @Nullable ShardBackendInfo getResult() {
+ return result;
+ }
+
+ private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) {
+ if (failure == null) {
+ this.result = Preconditions.checkNotNull(result);
+ } else {
+ LOG.warn("Failed to resolve shard", failure);
+ }
+ }
+ }
+
private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
// TODO: maybe make this configurable somehow?
private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
- private final ActorContext actorContext;
- // FIXME: this counter should be in superclass somewhere
+ private final ConcurrentMap<Long, Entry> backends = new ConcurrentHashMap<>();
private final AtomicLong nextSessionId = new AtomicLong();
private final Function1<ActorRef, ?> connectFunction;
+ private final ActorContext actorContext;
@GuardedBy("this")
private long nextShard = 1;
ABIVersion.current()));
}
- @Override
- protected void invalidateBackendInfo(final CompletionStage<? extends BackendInfo> info) {
- LOG.trace("Initiated invalidation of backend information {}", info);
- info.thenAccept(this::invalidate);
- }
-
- private void invalidate(final BackendInfo result) {
- Preconditions.checkArgument(result instanceof ShardBackendInfo);
- LOG.debug("Invalidating backend information {}", result);
- actorContext.getPrimaryShardInfoCache().remove(((ShardBackendInfo)result).getShardName());
- }
-
Long resolveShardForPath(final YangInstanceIdentifier path) {
final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
Long cookie = shards.get(shardName);
return cookie;
}
- @Override
- protected CompletableFuture<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
+ private CompletionStage<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
final String shardName = shards.inverse().get(cookie);
if (shardName == null) {
LOG.warn("Failing request for non-existent cookie {}", cookie);
return NULL_FUTURE;
}
- final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<>();
+ LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
- FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
+ return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
LOG.debug("Looking up primary info for {} from {}", shardName, info);
return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
}).thenApply(response -> {
return new ShardBackendInfo(success.getBackend(),
nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
success.getDataTree(), success.getMaxMessages());
- }).whenComplete((info, throwablw) -> {
- if (throwablw != null) {
- ret.completeExceptionally(throwablw);
- } else {
- ret.complete(info);
- }
});
+ }
- LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
- return ret;
+ @Override
+ public CompletionStage<? extends ShardBackendInfo> getBackendInfo(final Long cookie) {
+ return backends.computeIfAbsent(cookie, key -> new Entry(resolveBackendInfo(key))).getStage();
+ }
+
+ @Override
+ public CompletionStage<? extends ShardBackendInfo> refreshBackendInfo(final Long cookie,
+ final ShardBackendInfo staleInfo) {
+ final Entry existing = backends.get(cookie);
+ if (existing != null) {
+ if (!staleInfo.equals(existing.getResult())) {
+ return existing.getStage();
+ }
+
+ LOG.debug("Invalidating backend information {}", staleInfo);
+ actorContext.getPrimaryShardInfoCache().remove(staleInfo.getShardName());
+
+ LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), staleInfo);
+ backends.remove(cookie, existing);
+ }
+
+ return getBackendInfo(cookie);
}
}