CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
index 17d988005fadf32c8209837671d43d0aa2981b60..ad05a1ca71001285a8aed1f5afe16b1f511f3e96 100644 (file)
@@ -28,8 +28,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.util.concurrent.RateLimiter;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
@@ -42,11 +47,14 @@ import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,7 +70,7 @@ import scala.concurrent.duration.FiniteDuration;
  * easily. An ActorContext can be freely passed around to local object instances
  * but should not be passed to actors especially remote actors
  */
-public class ActorContext {
+public class ActorContext implements RemovalListener<String, Future<PrimaryShardInfo>> {
     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
     private static final String METRIC_RATE = "rate";
@@ -96,11 +104,13 @@ public class ActorContext {
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
     private final Dispatchers dispatchers;
-    private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+    private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
 
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
+    @GuardedBy("shardInfoListeners")
+    private final Collection<ShardInfoListenerRegistration<?>> shardInfoListeners = new ArrayList<>();
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -141,8 +151,9 @@ public class ActorContext {
 
         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
 
-        primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+        primaryShardInfoCache = CacheBuilder.newBuilder()
                 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+                .removalListener(this)
                 .build();
     }
 
@@ -196,24 +207,24 @@ public class ActorContext {
         return schemaContext;
     }
 
-    public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
-        Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+    public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
+        Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
         if(ret != null){
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
                 new FindPrimary(shardName, true), shardInitializationTimeout);
 
-        return future.transform(new Mapper<Object, ActorSelection>() {
+        return future.transform(new Mapper<Object, PrimaryShardInfo>() {
             @Override
-            public ActorSelection checkedApply(Object response) throws Exception {
-                if(response instanceof PrimaryFound) {
-                    PrimaryFound found = (PrimaryFound)response;
-
-                    LOG.debug("Primary found {}", found.getPrimaryPath());
-                    ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
-                    primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
-                    return actorSelection;
+            public PrimaryShardInfo checkedApply(Object response) throws Exception {
+                if(response instanceof RemotePrimaryShardFound) {
+                    LOG.debug("findPrimaryShardAsync received: {}", response);
+                    return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null);
+                } else if(response instanceof LocalPrimaryShardFound) {
+                    LOG.debug("findPrimaryShardAsync received: {}", response);
+                    LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
+                    return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree());
                 } else if(response instanceof NotInitializedException) {
                     throw (NotInitializedException)response;
                 } else if(response instanceof PrimaryNotFoundException) {
@@ -228,6 +239,20 @@ public class ActorContext {
         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
     }
 
+    private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
+            DataTree localShardDataTree) {
+        ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
+        PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
+        primaryShardInfoCache.put(shardName, Futures.successful(info));
+
+        synchronized (shardInfoListeners) {
+            for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
+                reg.getInstance().onShardInfoUpdated(shardName, info);
+            }
+        }
+        return info;
+    }
+
     /**
      * Finds a local shard given its shard name and return it's ActorRef
      *
@@ -367,8 +392,7 @@ public class ActorContext {
     }
 
     public void shutdown() {
-        shardManager.tell(PoisonPill.getInstance(), null);
-        actorSystem.shutdown();
+        shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
     public ClusterWrapper getClusterWrapper() {
@@ -387,15 +411,15 @@ public class ActorContext {
     public void broadcast(final Object message){
         for(final String shardName : configuration.getAllShardNames()){
 
-            Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
-            primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+            Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
+            primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
                 @Override
-                public void onComplete(Throwable failure, ActorSelection primaryShard) {
+                public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
                     if(failure != null) {
                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
                                 message.getClass().getSimpleName(), shardName, failure);
                     } else {
-                        primaryShard.tell(message, ActorRef.noSender());
+                        primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
                     }
                 }
             }, getClientDispatcher());
@@ -553,7 +577,31 @@ public class ActorContext {
     }
 
     @VisibleForTesting
-    Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
-        return primaryShardActorSelectionCache;
+    Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
+        return primaryShardInfoCache;
+    }
+
+    public <T extends ShardInfoListener> ShardInfoListenerRegistration<T> registerShardInfoListener(final T listener) {
+        final ShardInfoListenerRegistration<T> reg = new ShardInfoListenerRegistration<T>(listener, this);
+
+        synchronized (shardInfoListeners) {
+            shardInfoListeners.add(reg);
+        }
+        return reg;
+    }
+
+    protected void removeShardInfoListener(final ShardInfoListenerRegistration<?> registration) {
+        synchronized (shardInfoListeners) {
+            shardInfoListeners.remove(registration);
+        }
+    }
+
+    @Override
+    public void onRemoval(final RemovalNotification<String, Future<PrimaryShardInfo>> notification) {
+        synchronized (shardInfoListeners) {
+            for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
+                reg.getInstance().onShardInfoUpdated(notification.getKey(), null);
+            }
+        }
     }
 }