tell-based - reconnect on leader change
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractShardBackendResolver.java
index faffe0a3764985f5439e13867928a1ee2a1f2511..d2b10c1b7067a038e90d910b16a0749ff4b76212 100644 (file)
@@ -11,11 +11,14 @@ import akka.actor.ActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -33,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Function1;
@@ -85,6 +89,7 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
     private final AtomicLong nextSessionId = new AtomicLong();
     private final Function1<ActorRef, ?> connectFunction;
     private final ActorContext actorContext;
+    private final Set<Consumer<Long>> staleBackendInfoCallbacks = ConcurrentHashMap.newKeySet();
 
     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
     AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
@@ -93,6 +98,20 @@ abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBac
             ABIVersion.current()));
     }
 
+    @Override
+    public Registration notifyWhenBackendInfoIsStale(final Consumer<Long> callback) {
+        staleBackendInfoCallbacks.add(callback);
+        return () -> staleBackendInfoCallbacks.remove(callback);
+    }
+
+    protected void notifyStaleBackendInfoCallbacks(Long cookie) {
+        staleBackendInfoCallbacks.forEach(callback -> callback.accept(cookie));
+    }
+
+    protected ActorContext actorContext() {
+        return actorContext;
+    }
+
     protected final void flushCache(final String shardName) {
         actorContext.getPrimaryShardInfoCache().remove(shardName);
     }