Bug 3194: Dynamically update PrimaryShardInfo cache when leader changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index cff44b13cb3edb1756ed6159e95d8fe259f91fde..63d4c9826b0835de095103c3c463c1d811e5ecae 100644 (file)
@@ -24,6 +24,7 @@ import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
@@ -54,17 +55,21 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -116,10 +121,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final CountDownLatch waitTillReadyCountdownLatch;
 
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache;
+
     /**
      */
     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
+            PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
@@ -130,6 +138,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+        this.primaryShardInfoCache = primaryShardInfoCache;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -145,13 +154,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ClusterWrapper cluster,
         final Configuration configuration,
         final DatastoreContext datastoreContext,
-        final CountDownLatch waitTillReadyCountdownLatch) {
+        final CountDownLatch waitTillReadyCountdownLatch,
+        final PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+        Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
 
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
+                waitTillReadyCountdownLatch, primaryShardInfoCache));
     }
 
     @Override
@@ -185,27 +197,34 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
         } else if(message instanceof ShardNotInitializedTimeout) {
             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
-        } else if(message instanceof LeaderStateChanged) {
-            onLeaderStateChanged((LeaderStateChanged)message);
+        } else if(message instanceof ShardLeaderStateChanged) {
+            onLeaderStateChanged((ShardLeaderStateChanged)message);
         } else {
             unknownMessage(message);
         }
 
     }
 
-    private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+    private void checkReady(){
+        if (isReadyWithLeaderId()) {
+            LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+                    persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+            waitTillReadyCountdownLatch.countDown();
+        }
+    }
+
+    private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
 
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
-            shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
-            if (isReadyWithLeaderId()) {
-                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-                waitTillReadyCountdownLatch.countDown();
+            shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
+            if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+                primaryShardInfoCache.remove(shardInformation.getShardName());
             }
 
+            checkReady();
         } else {
             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
         }
@@ -249,14 +268,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setRole(roleChanged.getNewRole());
-
-            if (isReadyWithLeaderId()) {
-                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-                waitTillReadyCountdownLatch.countDown();
-            }
-
+            checkReady();
             mBean.setSyncStatus(isInSync());
         }
     }
@@ -439,6 +451,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
                 getShardActorPath(shardName, memberName), getSelf());
         }
+
+        checkReady();
     }
 
     private void onDatastoreContext(DatastoreContext context) {
@@ -510,6 +524,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
 
         final String shardName = message.getShardName();
+        final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
@@ -517,7 +532,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    Object found = new PrimaryFound(info.getSerializedLeaderActor());
+                    String primaryPath = info.getSerializedLeaderActor();
+                    Object found = canReturnLocalShardState && info.isLeader() ?
+                            new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+                                new RemotePrimaryShardFound(primaryPath);
 
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
@@ -537,7 +555,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
                         shardName, path);
 
-                getContext().actorSelection(path).forward(message, getContext());
+                getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
+                        message.isWaitUntilReady()), getContext());
                 return;
             }
         }
@@ -665,6 +684,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private ActorRef actor;
         private ActorPath actorPath;
         private final Map<String, String> peerAddresses;
+        private Optional<DataTree> localShardDataTree;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
@@ -703,6 +723,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardId;
         }
 
+        void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+            this.localShardDataTree = localShardDataTree;
+        }
+
+        Optional<DataTree> getLocalShardDataTree() {
+            return localShardDataTree;
+        }
+
         Map<String, String> getPeerAddresses() {
             return peerAddresses;
         }
@@ -731,7 +759,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+            return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {
@@ -809,10 +837,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return false;
         }
 
-        void setLeaderId(String leaderId) {
+        boolean setLeaderId(String leaderId) {
+            boolean changed = !Objects.equal(this.leaderId, leaderId);
             this.leaderId = leaderId;
 
             notifyOnShardInitializedCallbacks();
+
+            return changed;
         }
     }
 
@@ -823,18 +854,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final Configuration configuration;
         final DatastoreContext datastoreContext;
         private final CountDownLatch waitTillReadyCountdownLatch;
+        private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-        ShardManagerCreator(ClusterWrapper cluster,
-                            Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
+                CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
             this.cluster = cluster;
             this.configuration = configuration;
             this.datastoreContext = datastoreContext;
             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+            this.primaryShardInfoCache = primaryShardInfoCache;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+                    primaryShardInfoCache);
         }
     }