import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
@ThreadSafe
public abstract class BackendInfoResolver<T extends BackendInfo> {
private static final Logger LOG = LoggerFactory.getLogger(BackendInfoResolver.class);
- private final ConcurrentMap<Long, CompletionStage<T>> backends = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, CompletableFuture<T>> backends = new ConcurrentHashMap<>();
- // This is what the client needs to start processing. For as long as we do not have this, we should not complete
- // this stage until we have this information
- public final CompletionStage<? extends T> getBackendInfo(final long cookie) {
- return backends.computeIfAbsent(cookie, this::resolveBackendInfo);
+ /**
+ * Return the currently-resolved backend information, if available. This method is guaranteed not to block, but will
+ * initiate resolution of the information if there is none.
+ *
+ * @param cookie Backend cookie
+ * @return Backend information, if available
+ */
+ public final Optional<T> getFutureBackendInfo(final Long cookie) {
+ final Future<T> f = lookupBackend(cookie);
+ if (f.isDone()) {
+ try {
+ return Optional.of(f.get());
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.debug("Resolution of {} failed", f, e);
+ }
+ }
+
+ return Optional.empty();
}
/**
* requests information which is not currently cached.
*
* @param cookie Backend cookie
- * @return A {@link CompletionStage} resulting in information about the backend
+ * @return A {@link CompletableFuture} resulting in information about the backend
*/
- protected abstract @Nonnull CompletionStage<T> resolveBackendInfo(final @Nonnull Long cookie);
+ protected abstract @Nonnull CompletableFuture<T> resolveBackendInfo(final @Nonnull Long cookie);
/**
* Invalidate previously-resolved shard information. This method is invoked when a timeout is detected
* @param info Previous promise of backend information
*/
protected abstract void invalidateBackendInfo(@Nonnull CompletionStage<? extends BackendInfo> info);
+
+ // This is what the client needs to start processing. For as long as we do not have this, we should not complete
+ // this stage until we have this information
+ final CompletionStage<? extends T> getBackendInfo(final Long cookie) {
+ return lookupBackend(cookie);
+ }
+
+ private CompletableFuture<T> lookupBackend(final Long cookie) {
+ return backends.computeIfAbsent(Preconditions.checkNotNull(cookie), this::resolveBackendInfo);
+ }
}