Bug 4105: Pass ModuleShardConfiguration with CreateShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 426a2e0934f173560647a13569b22d1e06f632b2..60894f5bc9723059ef7441aa79357084a00f2345 100644 (file)
@@ -11,51 +11,71 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
+import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
-import akka.japi.Procedure;
 import akka.persistence.RecoveryCompleted;
-import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
+import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
+import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The ShardManager has the following jobs,
@@ -68,7 +88,7 @@ import scala.concurrent.duration.Duration;
  */
 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
-    private final Logger LOG = LoggerFactory.getLogger(getClass());
+    private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
@@ -84,32 +104,39 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // A data store could be of type config/operational
     private final String type;
 
+    private final String shardManagerIdentifierString;
+
     private final ClusterWrapper cluster;
 
     private final Configuration configuration;
 
     private final String shardDispatcherPath;
 
-    private ShardManagerInfoMBean mBean;
+    private ShardManagerInfo mBean;
+
+    private DatastoreContext datastoreContext;
 
-    private final DatastoreContext datastoreContext;
+    private final CountDownLatch waitTillReadyCountdownLatch;
 
-    private Collection<String> knownModules = Collections.emptySet();
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-    private final DataPersistenceProvider dataPersistenceProvider;
+    private SchemaContext schemaContext;
 
     /**
      */
     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext) {
+            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
+            PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
         this.datastoreContext = datastoreContext;
-        this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
         this.type = datastoreContext.getDataStoreType();
+        this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+        this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+        this.primaryShardInfoCache = primaryShardInfoCache;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -117,25 +144,33 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         createLocalShards();
     }
 
-    protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
-        return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
-    }
-
     public static Props props(
         final ClusterWrapper cluster,
         final Configuration configuration,
-        final DatastoreContext datastoreContext) {
+        final DatastoreContext datastoreContext,
+        final CountDownLatch waitTillReadyCountdownLatch,
+        final PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
+        Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+        Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
+                waitTillReadyCountdownLatch, primaryShardInfoCache));
+    }
+
+    @Override
+    public void postStop() {
+        LOG.info("Stopping ShardManager");
 
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
+        mBean.unregisterMBean();
     }
 
     @Override
     public void handleCommand(Object message) throws Exception {
-        if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
-            findPrimary(FindPrimary.fromSerializable(message));
+        if (message  instanceof FindPrimary) {
+            findPrimary((FindPrimary)message);
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -144,16 +179,178 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
+        } else if (message instanceof ClusterEvent.MemberExited){
+            memberExited((ClusterEvent.MemberExited) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
-            ignoreMessage(message);
-        } else{
+            memberUnreachable((ClusterEvent.UnreachableMember)message);
+        } else if(message instanceof ClusterEvent.ReachableMember) {
+            memberReachable((ClusterEvent.ReachableMember) message);
+        } else if(message instanceof DatastoreContext) {
+            onDatastoreContext((DatastoreContext)message);
+        } else if(message instanceof RoleChangeNotification) {
+            onRoleChangeNotification((RoleChangeNotification) message);
+        } else if(message instanceof FollowerInitialSyncUpStatus){
+            onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
+        } else if(message instanceof ShardNotInitializedTimeout) {
+            onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
+        } else if(message instanceof ShardLeaderStateChanged) {
+            onLeaderStateChanged((ShardLeaderStateChanged) message);
+        } else if(message instanceof SwitchShardBehavior){
+            onSwitchShardBehavior((SwitchShardBehavior) message);
+        } else if(message instanceof CreateShard) {
+            onCreateShard((CreateShard)message);
+        } else {
             unknownMessage(message);
         }
 
     }
 
+    private void onCreateShard(CreateShard createShard) {
+        Object reply;
+        try {
+            ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+            if(localShards.containsKey(moduleShardConfig.getShardName())) {
+                throw new IllegalStateException(String.format("Shard with name %s already exists",
+                        moduleShardConfig.getShardName()));
+            }
+
+            configuration.addModuleShardConfiguration(moduleShardConfig);
+
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
+            Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
+                    moduleShardConfig.getShardMemberNames()*/);
+
+            LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+                    moduleShardConfig.getShardMemberNames(), peerAddresses);
+
+            DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+            if(shardDatastoreContext == null) {
+                shardDatastoreContext = datastoreContext;
+            }
+
+            ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
+                    shardDatastoreContext, createShard.getShardPropsCreator());
+            localShards.put(info.getShardName(), info);
+
+            mBean.addLocalShard(shardId.toString());
+
+            if(schemaContext != null) {
+                info.setActor(newShardActor(schemaContext, info));
+            }
+
+            reply = new CreateShardReply();
+        } catch (Exception e) {
+            LOG.error("onCreateShard failed", e);
+            reply = new akka.actor.Status.Failure(e);
+        }
+
+        if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+            getSender().tell(reply, getSelf());
+        }
+    }
+
+    private void checkReady(){
+        if (isReadyWithLeaderId()) {
+            LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+                    persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+            waitTillReadyCountdownLatch.countDown();
+        }
+    }
+
+    private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
+        LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
+
+        ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
+        if(shardInformation != null) {
+            shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
+            shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
+            if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+                primaryShardInfoCache.remove(shardInformation.getShardName());
+            }
+
+            checkReady();
+        } else {
+            LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
+        }
+    }
+
+    private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+        ShardInformation shardInfo = message.getShardInfo();
+
+        LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
+                shardInfo.getShardName());
+
+        shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
+
+        if(!shardInfo.isShardInitialized()) {
+            LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+            message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
+        } else {
+            LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
+            message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+        }
+    }
+
+    private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+        LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
+                status.getName(), status.isInitialSyncDone());
+
+        ShardInformation shardInformation = findShardInformation(status.getName());
+
+        if(shardInformation != null) {
+            shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
+
+            mBean.setSyncStatus(isInSync());
+        }
+
+    }
+
+    private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
+        LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
+                roleChanged.getOldRole(), roleChanged.getNewRole());
+
+        ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
+        if(shardInformation != null) {
+            shardInformation.setRole(roleChanged.getNewRole());
+            checkReady();
+            mBean.setSyncStatus(isInSync());
+        }
+    }
+
+
+    private ShardInformation findShardInformation(String memberId) {
+        for(ShardInformation info : localShards.values()){
+            if(info.getShardId().toString().equals(memberId)){
+                return info;
+            }
+        }
+
+        return null;
+    }
+
+    private boolean isReadyWithLeaderId() {
+        boolean isReady = true;
+        for (ShardInformation info : localShards.values()) {
+            if(!info.isShardReadyWithLeaderId()){
+                isReady = false;
+                break;
+            }
+        }
+        return isReady;
+    }
+
+    private boolean isInSync(){
+        for (ShardInformation info : localShards.values()) {
+            if(!info.isInSync()){
+                return false;
+            }
+        }
+        return true;
+    }
+
     private void onActorInitialized(Object message) {
         final ActorRef sender = getSender();
 
@@ -168,39 +365,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (shardId.getShardName() == null) {
             return;
         }
+
         markShardAsInitialized(shardId.getShardName());
     }
 
     private void markShardAsInitialized(String shardName) {
-        LOG.debug("Initializing shard [{}]", shardName);
+        LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
+
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
             shardInformation.setActorInitialized();
+
+            shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
         }
     }
 
     @Override
     protected void handleRecover(Object message) throws Exception {
-        if(dataPersistenceProvider.isRecoveryApplicable()) {
-            if (message instanceof SchemaContextModules) {
-                SchemaContextModules msg = (SchemaContextModules) message;
-                knownModules = ImmutableSet.copyOf(msg.getModules());
-            } else if (message instanceof RecoveryFailure) {
-                RecoveryFailure failure = (RecoveryFailure) message;
-                LOG.error("Recovery failed", failure.cause());
-            } else if (message instanceof RecoveryCompleted) {
-                LOG.info("Recovery complete : {}", persistenceId());
-
-                // Delete all the messages from the akka journal except the last one
-                deleteMessages(lastSequenceNr() - 1);
-            }
-        } else {
-            if (message instanceof RecoveryCompleted) {
-                LOG.info("Recovery complete : {}", persistenceId());
+        if (message instanceof RecoveryCompleted) {
+            LOG.info("Recovery complete : {}", persistenceId());
 
-                // Delete all the messages from the akka journal
-                deleteMessages(lastSequenceNr());
-            }
+            // We no longer persist SchemaContext modules so delete all the prior messages from the akka
+            // journal on upgrade from Helium.
+            deleteMessages(lastSequenceNr());
         }
     }
 
@@ -212,7 +399,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
             @Override
             public Object get() {
                 return new LocalShardFound(shardInformation.getActor());
@@ -220,20 +407,50 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         });
     }
 
-    private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
-            final Supplier<Object> messageSupplier) {
-        if (!shardInformation.isShardInitialized()) {
-            if(waitUntilInitialized) {
+    private void sendResponse(ShardInformation shardInformation, boolean doWait,
+            boolean wantShardReady, final Supplier<Object> messageSupplier) {
+        if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
+            if(doWait) {
                 final ActorRef sender = getSender();
                 final ActorRef self = self();
-                shardInformation.addRunnableOnInitialized(new Runnable() {
+
+                Runnable replyRunnable = new Runnable() {
                     @Override
                     public void run() {
                         sender.tell(messageSupplier.get(), self);
                     }
-                });
+                };
+
+                OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
+                    new OnShardInitialized(replyRunnable);
+
+                shardInformation.addOnShardInitialized(onShardInitialized);
+
+                LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+
+                FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+                if(shardInformation.isShardInitialized()) {
+                    // If the shard is already initialized then we'll wait enough time for the shard to
+                    // elect a leader, ie 2 times the election timeout.
+                    timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+                            .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
+                }
+
+                Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
+                        timeout, getSelf(),
+                        new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
+                        getContext().dispatcher(), getSelf());
+
+                onShardInitialized.setTimeoutSchedule(timeoutSchedule);
+
+            } else if (!shardInformation.isShardInitialized()) {
+                LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
+                getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
             } else {
-                getSender().tell(new ActorNotInitialized(), getSelf());
+                LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
+                getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
             }
 
             return;
@@ -242,116 +459,212 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         getSender().tell(messageSupplier.get(), getSelf());
     }
 
+    private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+        return new NoShardLeaderException(null, shardId.toString());
+    }
+
+    private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+        return new NotInitializedException(String.format(
+                "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
-        memberNameToAddress.remove(message.member().roles().head());
+        String memberName = message.member().roles().head();
+
+        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        memberNameToAddress.remove(memberName);
+
+        for(ShardInformation info : localShards.values()){
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+        }
+    }
+
+    private void memberExited(ClusterEvent.MemberExited message) {
+        String memberName = message.member().roles().head();
+
+        LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        memberNameToAddress.remove(memberName);
+
+        for(ShardInformation info : localShards.values()){
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+        }
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
         String memberName = message.member().roles().head();
 
+        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.put(memberName, message.member().address());
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
-            info.updatePeerAddress(getShardIdentifier(memberName, shardName),
-                getShardActorPath(shardName, memberName));
+            String peerId = getShardIdentifier(memberName, shardName).toString();
+            info.updatePeerAddress(peerId, getShardActorPath(shardName, memberName), getSelf());
+
+            info.peerUp(memberName, peerId, getSelf());
         }
+
+        checkReady();
     }
 
-    /**
-     * Notifies all the local shards of a change in the schema context
-     *
-     * @param message
-     */
-    private void updateSchemaContext(final Object message) {
-        final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+    private void memberReachable(ClusterEvent.ReachableMember message) {
+        String memberName = message.member().roles().head();
+        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        markMemberAvailable(memberName);
+    }
+
+    private void memberUnreachable(ClusterEvent.UnreachableMember message) {
+        String memberName = message.member().roles().head();
+        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        markMemberUnavailable(memberName);
+    }
 
-        Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
-        Set<String> newModules = new HashSet<>(128);
+    private void markMemberUnavailable(final String memberName) {
+        for(ShardInformation info : localShards.values()){
+            String leaderId = info.getLeaderId();
+            if(leaderId != null && leaderId.contains(memberName)) {
+                LOG.debug("Marking Leader {} as unavailable.", leaderId);
+                info.setLeaderAvailable(false);
 
-        for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
-            String s = moduleIdentifier.getNamespace().toString();
-            newModules.add(s);
+                primaryShardInfoCache.remove(info.getShardName());
+            }
+
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
+    }
 
-        if(newModules.containsAll(knownModules)) {
+    private void markMemberAvailable(final String memberName) {
+        for(ShardInformation info : localShards.values()){
+            String leaderId = info.getLeaderId();
+            if(leaderId != null && leaderId.contains(memberName)) {
+                LOG.debug("Marking Leader {} as available.", leaderId);
+                info.setLeaderAvailable(true);
+            }
 
-            LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
+            info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+        }
+    }
 
-            knownModules = ImmutableSet.copyOf(newModules);
+    private void onDatastoreContext(DatastoreContext context) {
+        datastoreContext = context;
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() != null) {
+                info.getActor().tell(datastoreContext, getSelf());
+            }
+        }
+    }
 
-            dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+    private void onSwitchShardBehavior(SwitchShardBehavior message) {
+        ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
 
-                @Override
-                public void apply(SchemaContextModules param) throws Exception {
-                    LOG.debug("Sending new SchemaContext to Shards");
-                    for (ShardInformation info : localShards.values()) {
-                        if (info.getActor() == null) {
-                            info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
-                                            info.getPeerAddresses(), datastoreContext, schemaContext)
-                                            .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
-                        } else {
-                            info.getActor().tell(message, getSelf());
-                        }
-                    }
-                }
+        ShardInformation shardInformation = localShards.get(identifier.getShardName());
 
-            });
+        if(shardInformation != null && shardInformation.getActor() != null) {
+            shardInformation.getActor().tell(
+                    new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
         } else {
-            LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}",
-                    newModules, knownModules);
+            LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
+                    message.getShardName(), message.getNewState());
+        }
+    }
+
+    /**
+     * Notifies all the local shards of a change in the schema context
+     *
+     * @param message
+     */
+    private void updateSchemaContext(final Object message) {
+        schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+        LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
+
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() == null) {
+                LOG.debug("Creating Shard {}", info.getShardId());
+                info.setActor(newShardActor(schemaContext, info));
+            } else {
+                info.getActor().tell(message, getSelf());
+            }
         }
+    }
+
+    @VisibleForTesting
+    protected ClusterWrapper getCluster() {
+        return cluster;
+    }
 
+    @VisibleForTesting
+    protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
+        return getContext().actorOf(info.newProps(schemaContext)
+                        .withDispatcher(shardDispatcherPath), info.getShardId().toString());
     }
 
     private void findPrimary(FindPrimary message) {
-        String shardName = message.getShardName();
+        LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+
+        final String shardName = message.getShardName();
+        final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
         if (info != null) {
-            sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
+            sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    return new PrimaryFound(info.getActorPath().toString()).toSerializable();
+                    String primaryPath = info.getSerializedLeaderActor();
+                    Object found = canReturnLocalShardState && info.isLeader() ?
+                            new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+                                new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
+
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+                    }
+
+                    return found;
                 }
             });
 
             return;
         }
 
-        List<String> members = configuration.getMembersFromShardName(shardName);
+        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+            if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
+                String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
 
-        if(cluster.getCurrentMemberName() != null) {
-            members.remove(cluster.getCurrentMemberName());
-        }
+                LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+                        shardName, path);
 
-        /**
-         * FIXME: Instead of sending remote shard actor path back to sender,
-         * forward FindPrimary message to remote shard manager
-         */
-        // There is no way for us to figure out the primary (for now) so assume
-        // that one of the remote nodes is a primary
-        for(String memberName : members) {
-            Address address = memberNameToAddress.get(memberName);
-            if(address != null){
-                String path =
-                    getShardActorPath(shardName, memberName);
-                getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+                getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
+                        message.isWaitUntilReady()), getContext());
                 return;
             }
         }
-        getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+
+        LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+
+        getSender().tell(new PrimaryNotFoundException(
+                String.format("No primary shard found for %s.", shardName)), getSelf());
+    }
+
+    private StringBuilder getShardManagerActorPathBuilder(Address address) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
+        return builder;
     }
 
     private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
-            StringBuilder builder = new StringBuilder();
-            builder.append(address.toString())
-                .append("/user/")
-                .append(ShardManagerIdentifier.builder().type(type).build().toString())
-                .append("/")
+            StringBuilder builder = getShardManagerActorPathBuilder(address);
+            builder.append("/")
                 .append(getShardIdentifier(memberName, shardName));
             return builder.toString();
         }
@@ -377,43 +690,40 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      */
     private void createLocalShards() {
         String memberName = this.cluster.getCurrentMemberName();
-        List<String> memberShardNames =
-            this.configuration.getMemberShardNames(memberName);
+        Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
+        ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator();
         List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
-            Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
+            Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
-            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
+            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
+                    shardPropsCreator));
         }
 
-        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+
+        mBean.setShardManager(this);
     }
 
     /**
      * Given the name of the shard find the addresses of all it's peers
      *
      * @param shardName
-     * @return
      */
-    private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
-
-        Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
-
-        List<String> members =
-            this.configuration.getMembersFromShardName(shardName);
+    private Map<String, String> getPeerAddresses(String shardName) {
+        Collection<String> members = configuration.getMembersFromShardName(shardName);
+        Map<String, String> peerAddresses = new HashMap<>();
 
         String currentMemberName = this.cluster.getCurrentMemberName();
 
-        for(String memberName : members){
-            if(!currentMemberName.equals(memberName)){
-                ShardIdentifier shardId = getShardIdentifier(memberName,
-                    shardName);
-                String path =
-                    getShardActorPath(shardName, currentMemberName);
-                peerAddresses.put(shardId, path);
+        for(String memberName : members) {
+            if(!currentMemberName.equals(memberName)) {
+                ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+                String path = getShardActorPath(shardName, memberName);
+                peerAddresses.put(shardId.toString(), path);
             }
         }
         return peerAddresses;
@@ -440,32 +750,45 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    Collection<String> getKnownModules() {
-        return knownModules;
+    ShardManagerInfoMBean getMBean(){
+        return mBean;
     }
 
     @VisibleForTesting
-    DataPersistenceProvider getDataPersistenceProvider() {
-        return dataPersistenceProvider;
-    }
-
-    private class ShardInformation {
+    protected static class ShardInformation {
         private final ShardIdentifier shardId;
         private final String shardName;
         private ActorRef actor;
         private ActorPath actorPath;
-        private final Map<ShardIdentifier, String> peerAddresses;
+        private final Map<String, String> peerAddresses;
+        private Optional<DataTree> localShardDataTree;
+        private boolean leaderAvailable = false;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
 
-        private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+        private boolean followerSyncStatus = false;
+
+        private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
+        private String role ;
+        private String leaderId;
+        private short leaderVersion;
+
+        private final DatastoreContext datastoreContext;
+        private final ShardPropsCreator shardPropsCreator;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
-                Map<ShardIdentifier, String> peerAddresses) {
+                Map<String, String> peerAddresses, DatastoreContext datastoreContext,
+                ShardPropsCreator shardPropsCreator) {
             this.shardName = shardName;
             this.shardId = shardId;
             this.peerAddresses = peerAddresses;
+            this.datastoreContext = datastoreContext;
+            this.shardPropsCreator = shardPropsCreator;
+        }
+
+        Props newProps(SchemaContext schemaContext) {
+            return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext);
         }
 
         String getShardName() {
@@ -489,11 +812,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardId;
         }
 
-        Map<ShardIdentifier, String> getPeerAddresses() {
+        void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+            this.localShardDataTree = localShardDataTree;
+        }
+
+        Optional<DataTree> getLocalShardDataTree() {
+            return localShardDataTree;
+        }
+
+        Map<String, String> getPeerAddresses() {
             return peerAddresses;
         }
 
-        void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+        void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
                 peerAddress);
             if(peerAddresses.containsKey(peerId)){
@@ -505,27 +836,134 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                                 peerId, peerAddress, actor.path());
                     }
 
-                    actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
+                    actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
                 }
+
+                notifyOnShardInitializedCallbacks();
+            }
+        }
+
+        void peerDown(String memberName, String peerId, ActorRef sender) {
+            if(peerAddresses.containsKey(peerId) && actor != null) {
+                actor.tell(new PeerDown(memberName, peerId), sender);
             }
         }
 
+        void peerUp(String memberName, String peerId, ActorRef sender) {
+            if(peerAddresses.containsKey(peerId) && actor != null) {
+                actor.tell(new PeerUp(memberName, peerId), sender);
+            }
+        }
+
+        boolean isShardReady() {
+            return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
+        }
+
+        boolean isShardReadyWithLeaderId() {
+            return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
+                    (isLeader() || peerAddresses.get(leaderId) != null);
+        }
+
         boolean isShardInitialized() {
             return getActor() != null && actorInitialized;
         }
 
+        boolean isLeader() {
+            return Objects.equal(leaderId, shardId.toString());
+        }
+
+        String getSerializedLeaderActor() {
+            if(isLeader()) {
+                return Serialization.serializedActorPath(getActor());
+            } else {
+                return peerAddresses.get(leaderId);
+            }
+        }
+
         void setActorInitialized() {
+            LOG.debug("Shard {} is initialized", shardId);
+
             this.actorInitialized = true;
 
-            for(Runnable runnable: runnablesOnInitialized) {
-                runnable.run();
+            notifyOnShardInitializedCallbacks();
+        }
+
+        private void notifyOnShardInitializedCallbacks() {
+            if(onShardInitializedSet.isEmpty()) {
+                return;
+            }
+
+            boolean ready = isShardReadyWithLeaderId();
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
+                        ready ? "ready" : "initialized", onShardInitializedSet.size());
+            }
+
+            Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
+            while(iter.hasNext()) {
+                OnShardInitialized onShardInitialized = iter.next();
+                if(!(onShardInitialized instanceof OnShardReady) || ready) {
+                    iter.remove();
+                    onShardInitialized.getTimeoutSchedule().cancel();
+                    onShardInitialized.getReplyRunnable().run();
+                }
+            }
+        }
+
+        void addOnShardInitialized(OnShardInitialized onShardInitialized) {
+            onShardInitializedSet.add(onShardInitialized);
+        }
+
+        void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
+            onShardInitializedSet.remove(onShardInitialized);
+        }
+
+        void setRole(String newRole) {
+            this.role = newRole;
+
+            notifyOnShardInitializedCallbacks();
+        }
+
+        void setFollowerSyncStatus(boolean syncStatus){
+            this.followerSyncStatus = syncStatus;
+        }
+
+        boolean isInSync(){
+            if(RaftState.Follower.name().equals(this.role)){
+                return followerSyncStatus;
+            } else if(RaftState.Leader.name().equals(this.role)){
+                return true;
+            }
+
+            return false;
+        }
+
+        boolean setLeaderId(String leaderId) {
+            boolean changed = !Objects.equal(this.leaderId, leaderId);
+            this.leaderId = leaderId;
+            if(leaderId != null) {
+                this.leaderAvailable = true;
             }
+            notifyOnShardInitializedCallbacks();
 
-            runnablesOnInitialized.clear();
+            return changed;
         }
 
-        void addRunnableOnInitialized(Runnable runnable) {
-            runnablesOnInitialized.add(runnable);
+        String getLeaderId() {
+            return leaderId;
+        }
+
+        void setLeaderAvailable(boolean leaderAvailable) {
+            this.leaderAvailable = leaderAvailable;
+        }
+
+        short getLeaderVersion() {
+            return leaderVersion;
+        }
+
+        void setLeaderVersion(short leaderVersion) {
+            this.leaderVersion = leaderVersion;
         }
     }
 
@@ -535,20 +973,81 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ClusterWrapper cluster;
         final Configuration configuration;
         final DatastoreContext datastoreContext;
+        private final CountDownLatch waitTillReadyCountdownLatch;
+        private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-        ShardManagerCreator(ClusterWrapper cluster,
-                Configuration configuration, DatastoreContext datastoreContext) {
+        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
+                CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
             this.cluster = cluster;
             this.configuration = configuration;
             this.datastoreContext = datastoreContext;
+            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+            this.primaryShardInfoCache = primaryShardInfoCache;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContext);
+            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+                    primaryShardInfoCache);
+        }
+    }
+
+    private static class OnShardInitialized {
+        private final Runnable replyRunnable;
+        private Cancellable timeoutSchedule;
+
+        OnShardInitialized(Runnable replyRunnable) {
+            this.replyRunnable = replyRunnable;
+        }
+
+        Runnable getReplyRunnable() {
+            return replyRunnable;
+        }
+
+        Cancellable getTimeoutSchedule() {
+            return timeoutSchedule;
+        }
+
+        void setTimeoutSchedule(Cancellable timeoutSchedule) {
+            this.timeoutSchedule = timeoutSchedule;
         }
     }
 
+    private static class OnShardReady extends OnShardInitialized {
+        OnShardReady(Runnable replyRunnable) {
+            super(replyRunnable);
+        }
+    }
+
+    private static class ShardNotInitializedTimeout {
+        private final ActorRef sender;
+        private final ShardInformation shardInfo;
+        private final OnShardInitialized onShardInitialized;
+
+        ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+            this.sender = sender;
+            this.shardInfo = shardInfo;
+            this.onShardInitialized = onShardInitialized;
+        }
+
+        ActorRef getSender() {
+            return sender;
+        }
+
+        ShardInformation getShardInfo() {
+            return shardInfo;
+        }
+
+        OnShardInitialized getOnShardInitialized() {
+            return onShardInitialized;
+        }
+    }
+
+    /**
+     * We no longer persist SchemaContextModules but keep this class around for now for backwards
+     * compatibility so we don't get de-serialization failures on upgrade from Helium.
+     */
+    @Deprecated
     static class SchemaContextModules implements Serializable {
         private static final long serialVersionUID = -8884620101025936590L;