BUG-5280: add BackendInfo/BackendInfoResolver 58/39758/44
authorRobert Varga <rovarga@cisco.com>
Thu, 2 Jun 2016 09:20:51 +0000 (11:20 +0200)
committerRobert Varga <rovarga@cisco.com>
Wed, 22 Jun 2016 15:09:58 +0000 (17:09 +0200)
Client actor needs to be able to resolve a particular backend
so it can implement retry logic with request adaptation. Add
the baseline class and an implementation for current sharding.

Change-Id: Ic7b679b1cadaff130b3a266606fe48cad5c20614
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ShardBackendInfo.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfo.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfoResolver.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java

index 1e15fef..dd40f1e 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.pattern.ExplicitAskSupport;
 import akka.util.Timeout;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
@@ -18,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.actors.client.AbstractClientActor;
-import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
 import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import scala.Function1;
 import scala.concurrent.Await;
 import scala.concurrent.duration.Duration;
@@ -42,19 +43,23 @@ public final class DistributedDataStoreClientActor extends AbstractClientActor {
         }
     };
 
-    private DistributedDataStoreClientActor(final FrontendIdentifier frontendId) {
+    private final ActorContext actorContext;
+
+    private DistributedDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext) {
         super(frontendId);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
     }
 
     @Override
-    protected ClientActorBehavior initialBehavior(final ClientActorContext context) {
-        return new DistributedDataStoreClientBehavior(context);
+    protected DistributedDataStoreClientBehavior initialBehavior(final ClientActorContext context) {
+        return new DistributedDataStoreClientBehavior(context, actorContext);
     }
 
-    public static Props props(final @Nonnull MemberName memberName, @Nonnull final String storeName) {
+    public static Props props(final @Nonnull MemberName memberName, @Nonnull final String storeName, final ActorContext ctx) {
         final String name = "DistributedDataStore:storeName='" + storeName + "'";
         final FrontendIdentifier frontendId = FrontendIdentifier.create(memberName, FrontendType.forName(name));
-        return Props.create(DistributedDataStoreClientActor.class, () -> new DistributedDataStoreClientActor(frontendId));
+        return Props.create(DistributedDataStoreClientActor.class,
+            () -> new DistributedDataStoreClientActor(frontendId, ctx));
     }
 
     public static DistributedDataStoreClient getDistributedDataStoreClient(final @Nonnull ActorRef actor,
index cacda93..ff5f882 100644 (file)
@@ -14,6 +14,7 @@ import java.util.concurrent.CompletionStage;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
 import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,10 +47,12 @@ import org.slf4j.LoggerFactory;
 final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
 
+    private final ModuleShardBackendResolver resolver;
     private long nextHistoryId;
 
-    DistributedDataStoreClientBehavior(final ClientActorContext context) {
+    DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
         super(context);
+        resolver = new ModuleShardBackendResolver(actorContext);
     }
 
     //
@@ -105,4 +108,9 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
     public void close() {
         context().executeInActor(this::shutdown);
     }
+
+    @Override
+    protected ModuleShardBackendResolver resolver() {
+        return resolver;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java
new file mode 100644 (file)
index 0000000..6f15c72
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.primitives.UnsignedLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfo;
+import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfoResolver;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.compat.java8.FutureConverters;
+
+/**
+ * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
+ * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding
+ * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}.
+ *
+ * @author Robert Varga
+ */
+final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
+    private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
+    /**
+     * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
+     * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
+     * non-operational.
+     */
+    // TODO: maybe make this configurable somehow?
+    private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
+
+    private final ActorContext actorContext;
+
+    private volatile BiMap<String, Long> shards = ImmutableBiMap.of();
+
+    // FIXME: we really need just ActorContext.findPrimaryShardAsync()
+    ModuleShardBackendResolver(final ActorContext actorContext) {
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+    }
+
+    @Override
+    protected void invalidateBackendInfo(final CompletionStage<? extends BackendInfo> info) {
+        LOG.trace("Initiated invalidation of backend information {}", info);
+        info.thenAccept(this::invalidate);
+    }
+
+    private void invalidate(final BackendInfo result) {
+        Preconditions.checkArgument(result instanceof ShardBackendInfo);
+        LOG.debug("Invalidating backend information {}", result);
+        actorContext.getPrimaryShardInfoCache().remove(((ShardBackendInfo)result).getShardName());
+    }
+
+    @Override
+    protected CompletionStage<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
+        final String shardName = shards.inverse().get(cookie);
+        if (shardName == null) {
+            LOG.warn("Failing request for non-existent cookie {}", cookie);
+            return CompletableFuture.completedFuture(null);
+        }
+
+        LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
+        return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName))
+                .thenApply(o -> createBackendInfo(o, shardName, cookie));
+    }
+
+    private static ABIVersion toABIVersion(final short version) {
+        switch (version) {
+            case DataStoreVersions.BORON_VERSION:
+                return ABIVersion.BORON;
+        }
+
+        throw new IllegalArgumentException("Unsupported version " + version);
+    }
+
+    private static ShardBackendInfo createBackendInfo(final Object result, final String shardName, final Long cookie) {
+        Preconditions.checkArgument(result instanceof PrimaryShardInfo);
+        final PrimaryShardInfo info = (PrimaryShardInfo) result;
+
+        LOG.debug("Creating backend information for {}", info);
+        return new ShardBackendInfo(info.getPrimaryShardActor().resolveOne(DEAD_TIMEOUT).value().get().get(),
+            toABIVersion(info.getPrimaryShardVersion()), shardName, UnsignedLong.fromLongBits(cookie),
+            info.getLocalShardDataTree());
+     }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ShardBackendInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ShardBackendInfo.java
new file mode 100644 (file)
index 0000000..677ac40
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedLong;
+import java.util.Optional;
+import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfo;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Combined backend tracking. Aside from usual {@link BackendInfo}, this object also tracks the cookie assigned
+ * to the shard. This assignment remains constant for as long as the client is not restarted.
+ *
+ * @author Robert Varga
+ */
+@ThreadSafe
+final class ShardBackendInfo extends BackendInfo {
+    private final Optional<DataTree> dataTree;
+    private final UnsignedLong cookie;
+    private final String shardName;
+
+    ShardBackendInfo(final ActorRef actor, final ABIVersion version, final String shardName, final UnsignedLong cookie,
+        final Optional<DataTree> dataTree) {
+        super(actor, version);
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.cookie = Preconditions.checkNotNull(cookie);
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+    }
+
+    UnsignedLong getCookie() {
+        return cookie;
+    }
+
+    Optional<DataTree> getDataTree() {
+        return dataTree;
+    }
+
+    String getShardName() {
+        return shardName;
+    }
+
+    LocalHistoryIdentifier brandHistory(final LocalHistoryIdentifier id) {
+        Preconditions.checkArgument(id.getCookie() == 0, "History %s is already branded", id);
+        return new LocalHistoryIdentifier(id.getClientId(), id.getHistoryId(), cookie.longValue());
+    }
+
+    @Override
+    protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+        return super.addToStringAttributes(toStringHelper).add("cookie", cookie).add("shard", shardName);
+    }
+}
index 0bdb29b..a925e93 100644 (file)
@@ -76,28 +76,14 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
     private final TransactionContextFactory txContextFactory;
 
-    public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
-            Configuration configuration, DatastoreContextFactory datastoreContextFactory,
-            DatastoreSnapshot restoreFromSnapshot) {
+    public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+            final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
+            final DatastoreSnapshot restoreFromSnapshot) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
 
-        final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
-            datastoreContextFactory.getBaseDatastoreContext().getDataStoreName());
-        final ActorRef clientActor = actorSystem.actorOf(clientProps);
-        try {
-            client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            LOG.error("Failed to get actor for {}", clientProps, e);
-            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            throw Throwables.propagate(e);
-        }
-
-        identifier = client.getIdentifier();
-        LOG.debug("Distributed data store client {} started", identifier);
-
         String shardManagerId = ShardManagerIdentifier.builder()
                 .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
 
@@ -115,6 +101,20 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
                 shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
 
+        final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+            datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
+        final ActorRef clientActor = actorSystem.actorOf(clientProps);
+        try {
+            client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error("Failed to get actor for {}", clientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            throw Throwables.propagate(e);
+        }
+
+        identifier = client.getIdentifier();
+        LOG.debug("Distributed data store client {} started", identifier);
+
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
 
@@ -131,7 +131,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     @VisibleForTesting
-    DistributedDataStore(ActorContext actorContext, ClientIdentifier identifier) {
+    DistributedDataStore(final ActorContext actorContext, final ClientIdentifier identifier) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
         this.client = null;
         this.identifier = Preconditions.checkNotNull(identifier);
@@ -140,7 +140,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
     }
 
-    public void setCloseable(AutoCloseable closeable) {
+    public void setCloseable(final AutoCloseable closeable) {
         this.closeable = closeable;
     }
 
@@ -148,8 +148,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
                                               ListenerRegistration<L> registerChangeListener(
-        final YangInstanceIdentifier path, L listener,
-        AsyncDataBroker.DataChangeScope scope) {
+        final YangInstanceIdentifier path, final L listener,
+        final AsyncDataBroker.DataChangeScope scope) {
 
         Preconditions.checkNotNull(path, "path should not be null");
         Preconditions.checkNotNull(listener, "listener should not be null");
@@ -166,7 +166,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     @Override
-    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) {
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
         Preconditions.checkNotNull(treeId, "treeId should not be null");
         Preconditions.checkNotNull(listener, "listener should not be null");
 
@@ -183,7 +183,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
     @Override
     public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
-            DOMDataTreeIdentifier subtree, C cohort) {
+            final DOMDataTreeIdentifier subtree, final C cohort) {
         YangInstanceIdentifier treeId =
                 Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
         Preconditions.checkNotNull(cohort, "listener should not be null");
@@ -220,12 +220,12 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     @Override
-    public void onGlobalContextUpdated(SchemaContext schemaContext) {
+    public void onGlobalContextUpdated(final SchemaContext schemaContext) {
         actorContext.setSchemaContext(schemaContext);
     }
 
     @Override
-    public void onDatastoreContextUpdated(DatastoreContextFactory contextFactory) {
+    public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) {
         LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName());
 
         actorContext.setDatastoreContext(contextFactory);
@@ -278,8 +278,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         }
     }
 
-    private static ActorRef createShardManager(ActorSystem actorSystem, ShardManagerCreator creator,
-            String shardDispatcher, String shardManagerId) {
+    private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
+            final String shardDispatcher, final String shardManagerId) {
         Exception lastException = null;
 
         for(int i=0;i<100;i++) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfo.java
new file mode 100644 (file)
index 0000000..6a4d3e0
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+
+/**
+ * Basic information about how to talk to the backend. ClientActorBehavior uses this information to dispatch requests
+ * to the backend.
+ *
+ * This class is not final so concrete actor behavior implementations may subclass it and track more information about
+ * the backend. The {@link #hashCode()} and {@link #equals(Object)} methods are made final to ensure subclasses compare
+ * on identity.
+ *
+ * @author Robert Varga
+ */
+public class BackendInfo {
+    private final ABIVersion version;
+    private final ActorRef actor;
+
+    protected BackendInfo(final ActorRef actor, final ABIVersion version) {
+        this.version = Preconditions.checkNotNull(version);
+        this.actor = Preconditions.checkNotNull(actor);
+    }
+
+    public final ActorRef getActor() {
+        return actor;
+    }
+
+    public final ABIVersion getVersion() {
+        return version;
+    }
+
+    @Override
+    public final int hashCode() {
+        return super.hashCode();
+    }
+
+    @Override
+    public final boolean equals(final Object obj) {
+        return super.equals(obj);
+    }
+
+    @Override
+    public final String toString() {
+        return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
+    }
+
+    protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+        return toStringHelper.add("actor", actor).add("version", version);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfoResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfoResolver.java
new file mode 100644 (file)
index 0000000..2868998
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Caching resolver which resolves a cookie to a leader {@link ActorRef}. This class needs to be specialized by the
+ * client. It is used by {@link ClientActorBehavior} for request dispatch. Results are cached until they are invalidated
+ * by either the client actor (when a message timeout is detected) and by the specific frontend (on explicit
+ * invalidation or when updated information becomes available).
+ *
+ * @author Robert Varga
+ */
+@ThreadSafe
+public abstract class BackendInfoResolver<T extends BackendInfo> {
+    private static final Logger LOG = LoggerFactory.getLogger(BackendInfoResolver.class);
+    private final ConcurrentMap<Long, CompletionStage<T>> backends = new ConcurrentHashMap<>();
+
+    // This is what the client needs to start processing. For as long as we do not have this, we should not complete
+    // this stage until we have this information
+    public final CompletionStage<? extends T> getBackendInfo(final long cookie) {
+        return backends.computeIfAbsent(cookie, this::resolveBackendInfo);
+    }
+
+    /**
+     * Invalidate a particular instance of {@link BackendInfo}, typically as a response to a request timing out. If
+     * the provided information is not the one currently cached this method does nothing.
+     *
+     * @param cookie Backend cookie
+     * @param info Previous information to be invalidated
+     */
+    public final void invalidateBackend(final long cookie, final @Nonnull CompletionStage<? extends BackendInfo> info) {
+        if (backends.remove(cookie, Preconditions.checkNotNull(info))) {
+            LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), info);
+            invalidateBackendInfo(info);
+        }
+    }
+
+    /**
+     * Request new resolution of a particular backend identified by a cookie. This method is invoked when a client
+     * requests information which is not currently cached.
+     *
+     * @param cookie Backend cookie
+     * @return A {@link CompletionStage} resulting in information about the backend
+     */
+    protected abstract @Nonnull CompletionStage<T> resolveBackendInfo(final @Nonnull Long cookie);
+
+    /**
+     * Invalidate previously-resolved shard information. This method is invoked when a timeout is detected
+     * and the information may need to be refreshed.
+     *
+     * @param info Previous promise of backend information
+     */
+    protected abstract void invalidateBackendInfo(@Nonnull CompletionStage<? extends BackendInfo> info);
+}
index 7ac06d3..237b570 100644 (file)
@@ -74,4 +74,11 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
      * @return Next behavior to use, null if this actor should shut down.
      */
     protected abstract @Nullable ClientActorBehavior onCommand(@Nonnull Object command);
+
+    /**
+     * Override this method to provide a backend resolver instance.
+     *
+     * @return
+     */
+    protected abstract @Nonnull BackendInfoResolver<?> resolver();
 }