CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index bc4c825351cc72148f5276fc28d5a94e2e64f79d..f4fa7b3a97e8617f8ac474b494a08deb330a0b6d 100644 (file)
@@ -24,6 +24,7 @@ import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
@@ -41,28 +42,33 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -96,6 +102,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // A data store could be of type config/operational
     private final String type;
 
+    private final String shardManagerIdentifierString;
+
     private final ClusterWrapper cluster;
 
     private final Configuration configuration;
@@ -122,6 +130,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.datastoreContext = datastoreContext;
         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
         this.type = datastoreContext.getDataStoreType();
+        this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
@@ -133,7 +142,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
-        return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
+        return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider();
     }
 
     public static Props props(
@@ -158,8 +167,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void handleCommand(Object message) throws Exception {
-        if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
-            findPrimary(FindPrimary.fromSerializable(message));
+        if (message  instanceof FindPrimary) {
+            findPrimary((FindPrimary)message);
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -180,20 +189,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
         } else if(message instanceof ShardNotInitializedTimeout) {
             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
-        } else if(message instanceof LeaderStateChanged) {
-            onLeaderStateChanged((LeaderStateChanged)message);
+        } else if(message instanceof ShardLeaderStateChanged) {
+            onLeaderStateChanged((ShardLeaderStateChanged)message);
         } else {
             unknownMessage(message);
         }
 
     }
 
-    private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+    private void checkReady(){
+        if (isReadyWithLeaderId()) {
+            LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+                    persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+            waitTillReadyCountdownLatch.countDown();
+        }
+    }
+
+    private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
 
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
+            shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
             shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+            checkReady();
         } else {
             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
         }
@@ -203,13 +223,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInfo = message.getShardInfo();
 
         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
-                shardInfo.getShardId());
+                shardInfo.getShardName());
 
         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
 
         if(!shardInfo.isShardInitialized()) {
-            message.getSender().tell(new ActorNotInitialized(), getSelf());
+            LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+            message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
         } else {
+            LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
         }
     }
@@ -235,14 +257,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setRole(roleChanged.getNewRole());
-
-            if (isReady()) {
-                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-                waitTillReadyCountdownLatch.countDown();
-            }
-
+            checkReady();
             mBean.setSyncStatus(isInSync());
         }
     }
@@ -258,10 +273,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return null;
     }
 
-    private boolean isReady() {
+    private boolean isReadyWithLeaderId() {
         boolean isReady = true;
         for (ShardInformation info : localShards.values()) {
-            if(!info.isShardReady()){
+            if(!info.isShardReadyWithLeaderId()){
                 isReady = false;
                 break;
             }
@@ -297,7 +312,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void markShardAsInitialized(String shardName) {
-        LOG.debug("Initializing shard [{}]", shardName);
+        LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
 
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
@@ -367,6 +382,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
+                LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                         datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
@@ -375,8 +392,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
 
             } else if (!shardInformation.isShardInitialized()) {
-                getSender().tell(new ActorNotInitialized(), getSelf());
+                LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
+                getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
             } else {
+                LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
             }
 
@@ -392,13 +413,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "recovering and a leader is being elected. Try again later.", shardId));
     }
 
+    private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+        return new NotInitializedException(String.format(
+                "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
+        String memberName = message.member().roles().head();
+
+        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.remove(message.member().roles().head());
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
         String memberName = message.member().roles().head();
 
+        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.put(memberName, message.member().address());
 
         for(ShardInformation info : localShards.values()){
@@ -406,6 +440,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
                 getShardActorPath(shardName, memberName), getSelf());
         }
+
+        checkReady();
     }
 
     private void onDatastoreContext(DatastoreContext context) {
@@ -461,6 +497,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
+    @VisibleForTesting
+    protected ClusterWrapper getCluster() {
+        return cluster;
+    }
+
     @VisibleForTesting
     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
         return getContext().actorOf(Shard.props(info.getShardId(),
@@ -469,7 +510,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void findPrimary(FindPrimary message) {
+        LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+
         final String shardName = message.getShardName();
+        final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
@@ -477,10 +521,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+                    String primaryPath = info.getSerializedLeaderActor();
+                    Object found = canReturnLocalShardState && info.isLeader() ?
+                            new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+                                new RemotePrimaryShardFound(primaryPath);
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: Found primary for {}: {}", shardName, found);
+                        LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
                     }
 
                     return found;
@@ -490,38 +537,36 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        List<String> members = configuration.getMembersFromShardName(shardName);
+        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+            if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
+                String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
 
-        if(cluster.getCurrentMemberName() != null) {
-            members.remove(cluster.getCurrentMemberName());
-        }
+                LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+                        shardName, path);
 
-        /**
-         * FIXME: Instead of sending remote shard actor path back to sender,
-         * forward FindPrimary message to remote shard manager
-         */
-        // There is no way for us to figure out the primary (for now) so assume
-        // that one of the remote nodes is a primary
-        for(String memberName : members) {
-            Address address = memberNameToAddress.get(memberName);
-            if(address != null){
-                String path =
-                    getShardActorPath(shardName, memberName);
-                getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+                getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
+                        message.isWaitUntilReady()), getContext());
                 return;
             }
         }
-        getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+
+        LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+
+        getSender().tell(new PrimaryNotFoundException(
+                String.format("No primary shard found for %s.", shardName)), getSelf());
+    }
+
+    private StringBuilder getShardManagerActorPathBuilder(Address address) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
+        return builder;
     }
 
     private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
-            StringBuilder builder = new StringBuilder();
-            builder.append(address.toString())
-                .append("/user/")
-                .append(ShardManagerIdentifier.builder().type(type).build().toString())
-                .append("/")
+            StringBuilder builder = getShardManagerActorPathBuilder(address);
+            builder.append("/")
                 .append(getShardIdentifier(memberName, shardName));
             return builder.toString();
         }
@@ -628,6 +673,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private ActorRef actor;
         private ActorPath actorPath;
         private final Map<String, String> peerAddresses;
+        private Optional<DataTree> localShardDataTree;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
@@ -666,6 +712,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardId;
         }
 
+        void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+            this.localShardDataTree = localShardDataTree;
+        }
+
+        Optional<DataTree> getLocalShardDataTree() {
+            return localShardDataTree;
+        }
+
         Map<String, String> getPeerAddresses() {
             return peerAddresses;
         }
@@ -694,7 +748,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+            return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {