Bug 4149: Implement per-shard DatastoreContext settings
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index f4fa7b3a97e8617f8ac474b494a08deb330a0b6d..4f3d4aa7f931b1af11e1198de001d02831ac6d63 100644 (file)
@@ -8,72 +8,84 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.japi.Function;
-import akka.japi.Procedure;
 import akka.persistence.RecoveryCompleted;
-import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
+import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.NonPersistentDataProvider;
-import org.opendaylight.controller.cluster.PersistentDataProvider;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
+import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The ShardManager has the following jobs,
@@ -88,11 +100,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
 
-    // Stores a mapping between a member name and the address of the member
-    // Member names look like "member-1", "member-2" etc and are as specified
-    // in configuration
-    private final Map<String, Address> memberNameToAddress = new HashMap<>();
-
     // Stores a mapping between a shard name and it's corresponding information
     // Shard names look like inventory, topology etc and are as specified in
     // configuration
@@ -102,8 +109,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // A data store could be of type config/operational
     private final String type;
 
-    private final String shardManagerIdentifierString;
-
     private final ClusterWrapper cluster;
 
     private final Configuration configuration;
@@ -112,28 +117,32 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private ShardManagerInfo mBean;
 
-    private DatastoreContext datastoreContext;
+    private DatastoreContextFactory datastoreContextFactory;
 
-    private Collection<String> knownModules = Collections.emptySet();
+    private final CountDownLatch waitTillReadyCountdownLatch;
 
-    private final DataPersistenceProvider dataPersistenceProvider;
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-    private final CountDownLatch waitTillReadyCountdownLatch;
+    private final ShardPeerAddressResolver peerAddressResolver;
+
+    private SchemaContext schemaContext;
 
     /**
      */
     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+            DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
+            PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
-        this.datastoreContext = datastoreContext;
-        this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
-        this.type = datastoreContext.getDataStoreType();
-        this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
+        this.datastoreContextFactory = datastoreContextFactory;
+        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+        this.primaryShardInfoCache = primaryShardInfoCache;
+
+        peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -141,21 +150,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         createLocalShards();
     }
 
-    protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
-        return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider();
-    }
-
     public static Props props(
-        final ClusterWrapper cluster,
-        final Configuration configuration,
-        final DatastoreContext datastoreContext,
-        final CountDownLatch waitTillReadyCountdownLatch) {
+            final ClusterWrapper cluster,
+            final Configuration configuration,
+            final DatastoreContextFactory datastoreContextFactory,
+            final CountDownLatch waitTillReadyCountdownLatch,
+            final PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+        Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
 
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory,
+                waitTillReadyCountdownLatch, primaryShardInfoCache));
     }
 
     @Override
@@ -177,12 +185,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
+        } else if (message instanceof ClusterEvent.MemberExited){
+            memberExited((ClusterEvent.MemberExited) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
-            ignoreMessage(message);
-        } else if(message instanceof DatastoreContext) {
-            onDatastoreContext((DatastoreContext)message);
+            memberUnreachable((ClusterEvent.UnreachableMember)message);
+        } else if(message instanceof ClusterEvent.ReachableMember) {
+            memberReachable((ClusterEvent.ReachableMember) message);
+        } else if(message instanceof DatastoreContextFactory) {
+            onDatastoreContextFactory((DatastoreContextFactory)message);
         } else if(message instanceof RoleChangeNotification) {
             onRoleChangeNotification((RoleChangeNotification) message);
         } else if(message instanceof FollowerInitialSyncUpStatus){
@@ -190,13 +202,77 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ShardNotInitializedTimeout) {
             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
         } else if(message instanceof ShardLeaderStateChanged) {
-            onLeaderStateChanged((ShardLeaderStateChanged)message);
+            onLeaderStateChanged((ShardLeaderStateChanged) message);
+        } else if(message instanceof SwitchShardBehavior){
+            onSwitchShardBehavior((SwitchShardBehavior) message);
+        } else if(message instanceof CreateShard) {
+            onCreateShard((CreateShard)message);
+        } else if(message instanceof AddShardReplica){
+            onAddShardReplica((AddShardReplica)message);
+        } else if(message instanceof RemoveShardReplica){
+            onRemoveShardReplica((RemoveShardReplica)message);
         } else {
             unknownMessage(message);
         }
 
     }
 
+    private void onCreateShard(CreateShard createShard) {
+        Object reply;
+        try {
+            ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+            if(localShards.containsKey(moduleShardConfig.getShardName())) {
+                throw new IllegalStateException(String.format("Shard with name %s already exists",
+                        moduleShardConfig.getShardName()));
+            }
+
+            configuration.addModuleShardConfiguration(moduleShardConfig);
+
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
+            Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
+                    moduleShardConfig.getShardMemberNames()*/);
+
+            LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+                    moduleShardConfig.getShardMemberNames(), peerAddresses);
+
+            DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+            if(shardDatastoreContext == null) {
+                shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
+            } else {
+                shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+                        peerAddressResolver).build();
+            }
+
+            ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
+                    shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
+            localShards.put(info.getShardName(), info);
+
+            mBean.addLocalShard(shardId.toString());
+
+            if(schemaContext != null) {
+                info.setActor(newShardActor(schemaContext, info));
+            }
+
+            reply = new CreateShardReply();
+        } catch (Exception e) {
+            LOG.error("onCreateShard failed", e);
+            reply = new akka.actor.Status.Failure(e);
+        }
+
+        if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+            getSender().tell(reply, getSelf());
+        }
+    }
+
+    private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
+        return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
+                shardPeerAddressResolver(peerAddressResolver);
+    }
+
+    private DatastoreContext newShardDatastoreContext(String shardName) {
+        return newShardDatastoreContextBuilder(shardName).build();
+    }
+
     private void checkReady(){
         if (isReadyWithLeaderId()) {
             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
@@ -212,7 +288,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
-            shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+            shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
+            if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+                primaryShardInfoCache.remove(shardInformation.getShardName());
+            }
+
             checkReady();
         } else {
             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
@@ -324,26 +404,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     protected void handleRecover(Object message) throws Exception {
-        if(dataPersistenceProvider.isRecoveryApplicable()) {
-            if (message instanceof SchemaContextModules) {
-                SchemaContextModules msg = (SchemaContextModules) message;
-                knownModules = ImmutableSet.copyOf(msg.getModules());
-            } else if (message instanceof RecoveryFailure) {
-                RecoveryFailure failure = (RecoveryFailure) message;
-                LOG.error("Recovery failed", failure.cause());
-            } else if (message instanceof RecoveryCompleted) {
-                LOG.info("Recovery complete : {}", persistenceId());
-
-                // Delete all the messages from the akka journal except the last one
-                deleteMessages(lastSequenceNr() - 1);
-            }
-        } else {
-            if (message instanceof RecoveryCompleted) {
-                LOG.info("Recovery complete : {}", persistenceId());
+        if (message instanceof RecoveryCompleted) {
+            LOG.info("Recovery complete : {}", persistenceId());
 
-                // Delete all the messages from the akka journal
-                deleteMessages(lastSequenceNr());
-            }
+            // We no longer persist SchemaContext modules so delete all the prior messages from the akka
+            // journal on upgrade from Helium.
+            deleteMessages(lastSequenceNr());
         }
     }
 
@@ -384,8 +450,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
 
+                FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
+                if(shardInformation.isShardInitialized()) {
+                    // If the shard is already initialized then we'll wait enough time for the shard to
+                    // elect a leader, ie 2 times the election timeout.
+                    timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
+                            .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
+                }
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
-                        datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+                        timeout, getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
                         getContext().dispatcher(), getSelf());
 
@@ -407,13 +481,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         getSender().tell(messageSupplier.get(), getSelf());
     }
 
-    private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
-        return new NoShardLeaderException(String.format(
-                "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
-                "recovering and a leader is being elected. Try again later.", shardId));
+    private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+        return new NoShardLeaderException(null, shardId.toString());
     }
 
-    private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+    private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
         return new NotInitializedException(String.format(
                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
     }
@@ -424,7 +496,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        memberNameToAddress.remove(message.member().roles().head());
+        peerAddressResolver.removePeerAddress(memberName);
+
+        for(ShardInformation info : localShards.values()){
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+        }
+    }
+
+    private void memberExited(ClusterEvent.MemberExited message) {
+        String memberName = message.member().roles().head();
+
+        LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        peerAddressResolver.removePeerAddress(memberName);
+
+        for(ShardInformation info : localShards.values()){
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+        }
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
@@ -433,68 +522,104 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        memberNameToAddress.put(memberName, message.member().address());
+        addPeerAddress(memberName, message.member().address());
+
+        checkReady();
+    }
+
+    private void addPeerAddress(String memberName, Address address) {
+        peerAddressResolver.addPeerAddress(memberName, address);
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
-            info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
-                getShardActorPath(shardName, memberName), getSelf());
+            String peerId = getShardIdentifier(memberName, shardName).toString();
+            info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
+
+            info.peerUp(memberName, peerId, getSelf());
         }
+    }
 
-        checkReady();
+    private void memberReachable(ClusterEvent.ReachableMember message) {
+        String memberName = message.member().roles().head();
+        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        addPeerAddress(memberName, message.member().address());
+
+        markMemberAvailable(memberName);
     }
 
-    private void onDatastoreContext(DatastoreContext context) {
-        datastoreContext = context;
-        for (ShardInformation info : localShards.values()) {
-            if (info.getActor() != null) {
-                info.getActor().tell(datastoreContext, getSelf());
-            }
-        }
+    private void memberUnreachable(ClusterEvent.UnreachableMember message) {
+        String memberName = message.member().roles().head();
+        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        markMemberUnavailable(memberName);
     }
 
-    /**
-     * Notifies all the local shards of a change in the schema context
-     *
-     * @param message
-     */
-    private void updateSchemaContext(final Object message) {
-        final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+    private void markMemberUnavailable(final String memberName) {
+        for(ShardInformation info : localShards.values()){
+            String leaderId = info.getLeaderId();
+            if(leaderId != null && leaderId.contains(memberName)) {
+                LOG.debug("Marking Leader {} as unavailable.", leaderId);
+                info.setLeaderAvailable(false);
 
-        Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
-        Set<String> newModules = new HashSet<>(128);
+                primaryShardInfoCache.remove(info.getShardName());
+            }
 
-        for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
-            String s = moduleIdentifier.getNamespace().toString();
-            newModules.add(s);
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
+    }
 
-        if(newModules.containsAll(knownModules)) {
+    private void markMemberAvailable(final String memberName) {
+        for(ShardInformation info : localShards.values()){
+            String leaderId = info.getLeaderId();
+            if(leaderId != null && leaderId.contains(memberName)) {
+                LOG.debug("Marking Leader {} as available.", leaderId);
+                info.setLeaderAvailable(true);
+            }
 
-            LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
+            info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+        }
+    }
 
-            knownModules = ImmutableSet.copyOf(newModules);
+    private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+        datastoreContextFactory = factory;
+        for (ShardInformation info : localShards.values()) {
+            info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
+        }
+    }
 
-            dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+    private void onSwitchShardBehavior(SwitchShardBehavior message) {
+        ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
 
-                @Override
-                public void apply(SchemaContextModules param) throws Exception {
-                    LOG.debug("Sending new SchemaContext to Shards");
-                    for (ShardInformation info : localShards.values()) {
-                        if (info.getActor() == null) {
-                            info.setActor(newShardActor(schemaContext, info));
-                        } else {
-                            info.getActor().tell(message, getSelf());
-                        }
-                    }
-                }
+        ShardInformation shardInformation = localShards.get(identifier.getShardName());
 
-            });
+        if(shardInformation != null && shardInformation.getActor() != null) {
+            shardInformation.getActor().tell(
+                    new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
         } else {
-            LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}",
-                    newModules, knownModules);
+            LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
+                    message.getShardName(), message.getNewState());
         }
+    }
 
+    /**
+     * Notifies all the local shards of a change in the schema context
+     *
+     * @param message
+     */
+    private void updateSchemaContext(final Object message) {
+        schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+        LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
+
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() == null) {
+                LOG.debug("Creating Shard {}", info.getShardId());
+                info.setActor(newShardActor(schemaContext, info));
+            } else {
+                info.getActor().tell(message, getSelf());
+            }
+        }
     }
 
     @VisibleForTesting
@@ -504,9 +629,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @VisibleForTesting
     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
-        return getContext().actorOf(Shard.props(info.getShardId(),
-                info.getPeerAddresses(), datastoreContext, schemaContext)
-                        .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+        return getContext().actorOf(info.newProps(schemaContext)
+                .withDispatcher(shardDispatcherPath), info.getShardId().toString());
     }
 
     private void findPrimary(FindPrimary message) {
@@ -524,30 +648,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     String primaryPath = info.getSerializedLeaderActor();
                     Object found = canReturnLocalShardState && info.isLeader() ?
                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
-                                new RemotePrimaryShardFound(primaryPath);
+                                new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
 
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
-                    }
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+                            }
 
-                    return found;
+                            return found;
                 }
             });
 
             return;
         }
 
-        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
-            if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
-                String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
+        for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+            LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+                    shardName, address);
 
-                LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
-                        shardName, path);
-
-                getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
-                        message.isWaitUntilReady()), getContext());
-                return;
-            }
+            getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
+                    message.isWaitUntilReady()), getContext());
+            return;
         }
 
         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
@@ -556,23 +676,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 String.format("No primary shard found for %s.", shardName)), getSelf());
     }
 
-    private StringBuilder getShardManagerActorPathBuilder(Address address) {
-        StringBuilder builder = new StringBuilder();
-        builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
-        return builder;
-    }
-
-    private String getShardActorPath(String shardName, String memberName) {
-        Address address = memberNameToAddress.get(memberName);
-        if(address != null) {
-            StringBuilder builder = getShardManagerActorPathBuilder(address);
-            builder.append("/")
-                .append(getShardIdentifier(memberName, shardName));
-            return builder.toString();
-        }
-        return null;
-    }
-
     /**
      * Construct the name of the shard actor given the name of the member on
      * which the shard resides and the name of the shard
@@ -582,7 +685,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @return
      */
     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
-        return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
+        return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
     /**
@@ -592,40 +695,40 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      */
     private void createLocalShards() {
         String memberName = this.cluster.getCurrentMemberName();
-        List<String> memberShardNames =
-            this.configuration.getMemberShardNames(memberName);
+        Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
+        ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator();
         List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
-            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
+            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
+                    newShardDatastoreContext(shardName), shardPropsCreator, peerAddressResolver));
         }
 
-        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
-                    datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames);
+
+        mBean.setShardManager(this);
     }
 
     /**
      * Given the name of the shard find the addresses of all it's peers
      *
      * @param shardName
-     * @return
      */
-    private Map<String, String> getPeerAddresses(String shardName){
-
+    private Map<String, String> getPeerAddresses(String shardName) {
+        Collection<String> members = configuration.getMembersFromShardName(shardName);
         Map<String, String> peerAddresses = new HashMap<>();
 
-        List<String> members = this.configuration.getMembersFromShardName(shardName);
-
         String currentMemberName = this.cluster.getCurrentMemberName();
 
-        for(String memberName : members){
-            if(!currentMemberName.equals(memberName)){
+        for(String memberName : members) {
+            if(!currentMemberName.equals(memberName)) {
                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
-                String path = getShardActorPath(shardName, currentMemberName);
-                peerAddresses.put(shardId.toString(), path);
+                String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
+                peerAddresses.put(shardId.toString(), address);
             }
         }
         return peerAddresses;
@@ -635,14 +738,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     public SupervisorStrategy supervisorStrategy() {
 
         return new OneForOneStrategy(10, Duration.create("1 minute"),
-            new Function<Throwable, SupervisorStrategy.Directive>() {
-                @Override
-                public SupervisorStrategy.Directive apply(Throwable t) {
-                    LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
-                    return SupervisorStrategy.resume();
-                }
+                new Function<Throwable, SupervisorStrategy.Directive>() {
+            @Override
+            public SupervisorStrategy.Directive apply(Throwable t) {
+                LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+                return SupervisorStrategy.resume();
             }
-        );
+        }
+                );
 
     }
 
@@ -652,18 +755,181 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     @VisibleForTesting
-    Collection<String> getKnownModules() {
-        return knownModules;
+    ShardManagerInfoMBean getMBean(){
+        return mBean;
     }
 
-    @VisibleForTesting
-    DataPersistenceProvider getDataPersistenceProvider() {
-        return dataPersistenceProvider;
+    private void checkLocalShardExists(final String shardName, final ActorRef sender) {
+        if (localShards.containsKey(shardName)) {
+            String msg = String.format("Local shard %s already exists", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+        }
     }
 
-    @VisibleForTesting
-    ShardManagerInfoMBean getMBean(){
-        return mBean;
+    private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
+        final String shardName = shardReplicaMsg.getShardName();
+
+        // verify the local shard replica is already available in the controller node
+        LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
+
+        checkLocalShardExists(shardName, getSender());
+
+        // verify the shard with the specified name is present in the cluster configuration
+        if (!(this.configuration.isShardConfigured(shardName))) {
+            String msg = String.format("No module configuration exists for shard %s", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+            return;
+        }
+
+        // Create the localShard
+        if (schemaContext == null) {
+            String msg = String.format(
+                  "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            return;
+        }
+
+        Map<String, String> peerAddresses = getPeerAddresses(shardName);
+        if (peerAddresses.isEmpty()) {
+            String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            return;
+        }
+
+        Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+                getShardInitializationTimeout().duration().$times(2));
+
+        final ActorRef sender = getSender();
+        Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("Failed to find leader for shard %s", shardName), failure)),
+                        getSelf());
+                } else {
+                    if (!(response instanceof RemotePrimaryShardFound)) {
+                        String msg = String.format("Failed to find leader for shard %s: received response: %s",
+                                shardName, response);
+                        LOG.debug ("{}: {}", persistenceId(), msg);
+                        sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf());
+                        return;
+                    }
+
+                    RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
+                    addShard (shardName, message, sender);
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
+        checkLocalShardExists(shardName, sender);
+
+        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+
+        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+                DisableElectionsRaftPolicy.class.getName()).build();
+
+        final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
+                          getPeerAddresses(shardName), datastoreContext,
+                          new DefaultShardPropsCreator(), peerAddressResolver);
+        localShards.put(shardName, shardInfo);
+        shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+
+        //inform ShardLeader to add this shard as a replica by sending an AddServer message
+        LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+                response.getPrimaryPath(), shardId);
+
+        Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
+        Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+            new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object addServerResponse) {
+                if (failure != null) {
+                    LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+                            response.getPrimaryPath(), shardName, failure);
+
+                    // Remove the shard
+                    localShards.remove(shardName);
+                    if (shardInfo.getActor() != null) {
+                        shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+                    }
+
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("AddServer request to leader %s for shard %s failed",
+                            response.getPrimaryPath(), shardName), failure)), getSelf());
+                } else {
+                    AddServerReply reply = (AddServerReply)addServerResponse;
+                    onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).
+            getDispatcher(Dispatchers.DispatcherType.Client));
+        return;
+    }
+
+    private void onAddServerReply (String shardName, ShardInformation shardInfo,
+                                   AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+        LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+
+        if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+            LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+
+            // Make the local shard voting capable
+            shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+
+            mBean.addLocalShard(shardInfo.getShardId().toString());
+            sender.tell(new akka.actor.Status.Success(true), getSelf());
+        } else {
+            LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+                    persistenceId(), shardName, replyMsg.getStatus());
+
+            //remove the local replica created
+            localShards.remove(shardName);
+            if (shardInfo.getActor() != null) {
+                shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+            }
+            switch (replyMsg.getStatus()) {
+                case TIMEOUT:
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+                            leaderPath, shardName))), getSelf());
+                    break;
+                case NO_LEADER:
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+                        "There is no shard leader available for shard %s", shardName))), getSelf());
+                    break;
+                default :
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+                        "AddServer request to leader %s for shard %s failed with status %s",
+                        leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+            }
+        }
+    }
+
+    private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
+        String shardName = shardReplicaMsg.getShardName();
+
+        // verify the local shard replica is available in the controller node
+        if (!localShards.containsKey(shardName)) {
+            String msg = String.format("Local shard %s does not", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+            return;
+        }
+        // call RemoveShard for the shardName
+        getSender().tell(new akka.actor.Status.Success(true), getSelf());
+        return;
     }
 
     @VisibleForTesting
@@ -672,8 +938,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final String shardName;
         private ActorRef actor;
         private ActorPath actorPath;
-        private final Map<String, String> peerAddresses;
+        private final Map<String, String> initialPeerAddresses;
         private Optional<DataTree> localShardDataTree;
+        private boolean leaderAvailable = false;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
@@ -683,12 +950,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
         private String role ;
         private String leaderId;
+        private short leaderVersion;
+
+        private DatastoreContext datastoreContext;
+        private final ShardPropsCreator shardPropsCreator;
+        private final ShardPeerAddressResolver addressResolver;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
-                Map<String, String> peerAddresses) {
+                Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
+                ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) {
             this.shardName = shardName;
             this.shardId = shardId;
-            this.peerAddresses = peerAddresses;
+            this.initialPeerAddresses = initialPeerAddresses;
+            this.datastoreContext = datastoreContext;
+            this.shardPropsCreator = shardPropsCreator;
+            this.addressResolver = addressResolver;
+        }
+
+        Props newProps(SchemaContext schemaContext) {
+            return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext);
         }
 
         String getShardName() {
@@ -720,26 +1000,42 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return localShardDataTree;
         }
 
-        Map<String, String> getPeerAddresses() {
-            return peerAddresses;
+        DatastoreContext getDatastoreContext() {
+            return datastoreContext;
+        }
+
+        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+            this.datastoreContext = datastoreContext;
+            if (actor != null) {
+                LOG.debug ("Sending new DatastoreContext to {}", shardId);
+                actor.tell(this.datastoreContext, sender);
+            }
         }
 
         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
-            LOG.info("updatePeerAddress for peer {} with address {}", peerId,
-                peerAddress);
-            if(peerAddresses.containsKey(peerId)){
-                peerAddresses.put(peerId, peerAddress);
-
-                if(actor != null) {
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
-                                peerId, peerAddress, actor.path());
-                    }
+            LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
-                    actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
+            if(actor != null) {
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+                            peerId, peerAddress, actor.path());
                 }
 
-                notifyOnShardInitializedCallbacks();
+                actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
+            }
+
+            notifyOnShardInitializedCallbacks();
+        }
+
+        void peerDown(String memberName, String peerId, ActorRef sender) {
+            if(actor != null) {
+                actor.tell(new PeerDown(memberName, peerId), sender);
+            }
+        }
+
+        void peerUp(String memberName, String peerId, ActorRef sender) {
+            if(actor != null) {
+                actor.tell(new PeerUp(memberName, peerId), sender);
             }
         }
 
@@ -748,7 +1044,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+            return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
+                    (isLeader() || addressResolver.resolve(leaderId) != null);
         }
 
         boolean isShardInitialized() {
@@ -763,7 +1060,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             if(isLeader()) {
                 return Serialization.serializedActorPath(getActor());
             } else {
-                return peerAddresses.get(leaderId);
+                return addressResolver.resolve(leaderId);
             }
         }
 
@@ -826,10 +1123,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return false;
         }
 
-        void setLeaderId(String leaderId) {
+        boolean setLeaderId(String leaderId) {
+            boolean changed = !Objects.equal(this.leaderId, leaderId);
             this.leaderId = leaderId;
-
+            if(leaderId != null) {
+                this.leaderAvailable = true;
+            }
             notifyOnShardInitializedCallbacks();
+
+            return changed;
+        }
+
+        String getLeaderId() {
+            return leaderId;
+        }
+
+        void setLeaderAvailable(boolean leaderAvailable) {
+            this.leaderAvailable = leaderAvailable;
+        }
+
+        short getLeaderVersion() {
+            return leaderVersion;
+        }
+
+        void setLeaderVersion(short leaderVersion) {
+            this.leaderVersion = leaderVersion;
         }
     }
 
@@ -838,20 +1156,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         final ClusterWrapper cluster;
         final Configuration configuration;
-        final DatastoreContext datastoreContext;
+        final DatastoreContextFactory datastoreContextFactory;
         private final CountDownLatch waitTillReadyCountdownLatch;
+        private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-        ShardManagerCreator(ClusterWrapper cluster,
-                            Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration,
+                DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
+                PrimaryShardInfoFutureCache primaryShardInfoCache) {
             this.cluster = cluster;
             this.configuration = configuration;
-            this.datastoreContext = datastoreContext;
+            this.datastoreContextFactory = datastoreContextFactory;
             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+            this.primaryShardInfoCache = primaryShardInfoCache;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+            return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch,
+                    primaryShardInfoCache);
         }
     }
 
@@ -906,6 +1228,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    /**
+     * We no longer persist SchemaContextModules but keep this class around for now for backwards
+     * compatibility so we don't get de-serialization failures on upgrade from Helium.
+     */
+    @Deprecated
     static class SchemaContextModules implements Serializable {
         private static final long serialVersionUID = -8884620101025936590L;