import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
private final AtomicLong nextSessionId = new AtomicLong();
private final Function1<ActorRef, ?> connectFunction;
private final ActorContext actorContext;
+ private final Set<Consumer<Long>> staleBackendInfoCallbacks = ConcurrentHashMap.newKeySet();
// FIXME: we really need just ActorContext.findPrimaryShardAsync()
AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
ABIVersion.current()));
}
+ @Override
+ public Registration notifyWhenBackendInfoIsStale(final Consumer<Long> callback) {
+ staleBackendInfoCallbacks.add(callback);
+ return () -> staleBackendInfoCallbacks.remove(callback);
+ }
+
+ protected void notifyStaleBackendInfoCallbacks(Long cookie) {
+ staleBackendInfoCallbacks.forEach(callback -> callback.accept(cookie));
+ }
+
+ protected ActorContext actorContext() {
+ return actorContext;
+ }
+
protected final void flushCache(final String shardName) {
actorContext.getPrimaryShardInfoCache().remove(shardName);
}