Make Netty-3 dependency optional
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
index 961f32db0478eb583fc5232ee96f1483317f133e..889f1d47e1f8c2c55437d28180e73b684d1629f1 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -14,16 +15,18 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.AddressFromURIString;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
@@ -40,13 +43,13 @@ import akka.serialization.Serialization;
 import akka.testkit.TestActorRef;
 import akka.testkit.javadsl.TestKit;
 import akka.util.Timeout;
-import com.google.common.base.Function;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
-import java.net.URI;
+import java.time.Duration;
 import java.util.AbstractMap;
 import java.util.Arrays;
 import java.util.Collection;
@@ -59,18 +62,25 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
+import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
+import org.opendaylight.controller.cluster.datastore.AbstractClusterRefActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
@@ -89,8 +99,6 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
-import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
@@ -107,6 +115,7 @@ import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
@@ -120,23 +129,39 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.opendaylight.yangtools.yang.common.XMLNamespace;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-public class ShardManagerTest extends AbstractShardManagerTest {
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class ShardManagerTest extends AbstractClusterRefActorTest {
     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
+    private static final MemberName MEMBER_1 = MemberName.forName("member-1");
     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
 
-    private static SchemaContext TEST_SCHEMA_CONTEXT;
+    private static int ID_COUNTER = 1;
+    private static ActorRef mockShardActor;
+    private static ShardIdentifier mockShardName;
+    private static SettableFuture<Empty> ready;
+    private static EffectiveModelContext TEST_SCHEMA_CONTEXT;
+
+    private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+    private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
+            .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
+            .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
 
     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
@@ -150,6 +175,50 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         TEST_SCHEMA_CONTEXT = null;
     }
 
+    @Before
+    public void setUp() {
+        ready = SettableFuture.create();
+
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+
+        if (mockShardActor == null) {
+            mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
+            mockShardActor = getSystem().actorOf(MessageCollectorActor.props(), mockShardName.toString());
+        }
+
+        MessageCollectorActor.clearMessages(mockShardActor);
+    }
+
+    @After
+    public void tearDown() {
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+
+        mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        await().atMost(Duration.ofSeconds(10)).until(mockShardActor::isTerminated);
+        mockShardActor = null;
+
+        actorFactory.close();
+    }
+
+    private TestShardManager.Builder newTestShardMgrBuilder() {
+        return TestShardManager.builder(datastoreContextBuilder)
+            .distributedDataStore(mock(ClientBackedDataStore.class));
+    }
+
+    private TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
+        return newTestShardMgrBuilder().configuration(config);
+    }
+
+    private Props newShardMgrProps() {
+        return newShardMgrProps(new MockConfiguration());
+    }
+
+    private Props newShardMgrProps(final Configuration config) {
+        return newTestShardMgrBuilder(config).readinessFuture(ready).props();
+    }
+
     private ActorSystem newActorSystem(final String config) {
         return newActorSystem("cluster-test", config);
     }
@@ -163,10 +232,6 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         return system.actorOf(MessageCollectorActor.props(), name);
     }
 
-    private Props newShardMgrProps() {
-        return newShardMgrProps(new MockConfiguration());
-    }
-
     private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
         doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
@@ -179,8 +244,9 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     }
 
     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
-        return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
-                .distributedDataStore(mock(DistributedDataStore.class));
+        return TestShardManager.builder(datastoreContextBuilder)
+            .shardActor(shardActor)
+            .distributedDataStore(mock(ClientBackedDataStore.class));
     }
 
 
@@ -292,7 +358,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             }
         }
 
-        final Creator<ShardManager> creator = new Creator<ShardManager>() {
+        final Creator<ShardManager> creator = new Creator<>() {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() {
@@ -304,7 +370,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         final TestKit kit = new TestKit(getSystem());
 
-        final ActorRef shardManager = actorFactory.createActor(Props.create(
+        final ActorRef shardManager = actorFactory.createActor(Props.create(ShardManager.class,
                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
@@ -346,7 +412,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), PrimaryNotFoundException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
     }
 
     @Test
@@ -358,7 +424,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
         DataTree mockDataTree = mock(DataTree.class);
         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
@@ -371,7 +437,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
 
-        LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
+        LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
             LocalPrimaryShardFound.class);
         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
@@ -387,7 +453,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
@@ -399,7 +465,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
 
         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
     }
@@ -411,7 +477,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
         MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
@@ -425,8 +491,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
 
-        RemotePrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
-            RemotePrimaryShardFound.class);
+        RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
             primaryFound.getPrimaryPath().contains("member-2-shard-default"));
         assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
@@ -441,7 +506,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
     }
 
     @Test
@@ -450,11 +515,11 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
     }
 
     @Test
@@ -464,7 +529,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
         shardManager.tell(
@@ -473,7 +538,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
 
         DataTree mockDataTree = mock(DataTree.class);
         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
@@ -481,7 +546,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
 
-        LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
+        LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
             LocalPrimaryShardFound.class);
         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
@@ -505,30 +570,29 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         // RoleChangeNotification.
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
 
-        kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+        kit.expectNoMessage(Duration.ofMillis(150));
 
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
-        kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+        kit.expectNoMessage(Duration.ofMillis(150));
 
         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
         shardManager.tell(
             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
             mockShardActor);
 
-        kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+        kit.expectNoMessage(Duration.ofMillis(150));
 
         DataTree mockDataTree = mock(DataTree.class);
         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
             DataStoreVersions.CURRENT_VERSION), mockShardActor);
 
-        LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
-            LocalPrimaryShardFound.class);
+        LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
 
-        kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+        kit.expectNoMessage(Duration.ofMillis(200));
 
         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
     }
@@ -543,11 +607,11 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("2 seconds"), NotInitializedException.class);
+        kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
 
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
-        kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+        kit.expectNoMessage(Duration.ofMillis(200));
 
         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
     }
@@ -559,13 +623,13 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
             RaftState.Candidate.name()), mockShardActor);
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
+        kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
 
         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
     }
@@ -577,13 +641,13 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
             RaftState.IsolatedLeader.name()), mockShardActor);
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
 
-        kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
+        kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
 
         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
     }
@@ -595,11 +659,11 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
+        kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
 
         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
     }
@@ -640,7 +704,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
 
-        shardManager2.tell(new ActorInitialized(), mockShardActor2);
+        shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender());
 
         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
@@ -652,7 +716,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         shardManager1.underlyingActor().waitForMemberUp();
         shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
 
-        RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+        RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
         String path = found.getPrimaryPath();
         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
         assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
@@ -667,7 +731,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 //
 //                shardManager1.tell(new FindPrimary("astronauts", false), getRef());
 //
-//                expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
+//                expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
 
         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
     }
@@ -708,8 +772,8 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final TestKit kit = new TestKit(system1);
         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager1.tell(new ActorInitialized(), mockShardActor1);
-        shardManager2.tell(new ActorInitialized(), mockShardActor2);
+        shardManager1.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender());
+        shardManager2.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender());
 
         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
@@ -727,7 +791,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
 
-        RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+        RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
         String path = found.getPrimaryPath();
         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
 
@@ -735,40 +799,29 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             kit.getRef());
 
         shardManager1.underlyingActor().waitForUnreachableMember();
-
-        PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
-        assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
         MessageCollectorActor.clearMessages(mockShardActor1);
 
         shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
             kit.getRef());
 
-        MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
-
         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
 
         shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
             kit.getRef());
 
         shardManager1.underlyingActor().waitForReachableMember();
 
-        PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
-        assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
-        MessageCollectorActor.clearMessages(mockShardActor1);
-
         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
 
-        RemotePrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+        RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
         String path1 = found1.getPrimaryPath();
         assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
 
         shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
             kit.getRef());
 
-        MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
-
         // Test FindPrimary wait succeeds after reachable member event.
 
         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
@@ -780,7 +833,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         shardManager1.tell(
             MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
 
-        RemotePrimaryShardFound found2 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+        RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
         String path2 = found2.getPrimaryPath();
         assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
 
@@ -825,8 +878,8 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final TestKit kit = new TestKit(system1);
         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager1.tell(new ActorInitialized(), mockShardActor1);
-        shardManager2.tell(new ActorInitialized(), mockShardActor2);
+        shardManager1.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender());
+        shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender());
 
         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
@@ -844,7 +897,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
 
-        RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+        RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
         String path = found.getPrimaryPath();
         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
 
@@ -858,7 +911,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
 
         assertNull("Expected primaryShardInfoCache entry removed",
             primaryShardInfoCache.getIfPresent("default"));
@@ -871,7 +924,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
 
-        LocalPrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), LocalPrimaryShardFound.class);
+        LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
         String path1 = found1.getPrimaryPath();
         assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
 
@@ -921,8 +974,8 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final TestKit kit256 = new TestKit(system256);
         shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
-        shardManager256.tell(new ActorInitialized(), mockShardActor256);
-        shardManager2.tell(new ActorInitialized(), mockShardActor2);
+        shardManager256.tell(new ActorInitialized(mockShardActor256), ActorRef.noSender());
+        shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender());
 
         String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
         String memberId2   = "member-2-shard-default-"   + shardMrgIDSuffix;
@@ -940,8 +993,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
 
-        LocalPrimaryShardFound found = kit256.expectMsgClass(kit256.duration("5 seconds"),
-            LocalPrimaryShardFound.class);
+        LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
         String path = found.getPrimaryPath();
         assertTrue("Unexpected primary path " + path + " which must on member-256",
             path.contains("member-256-shard-default-config"));
@@ -957,7 +1009,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         // Make sure leader shard on member-256 is still leader and still in the cache.
         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
-        found = kit256.expectMsgClass(kit256.duration("5 seconds"), LocalPrimaryShardFound.class);
+        found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
         path = found.getPrimaryPath();
         assertTrue("Unexpected primary path " + path + " which must still not on member-256",
             path.contains("member-256-shard-default-config"));
@@ -986,7 +1038,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
 
-        LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
+        LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
 
         assertEquals("getShardName", "non-existent", notFound.getShardName());
     }
@@ -997,11 +1049,11 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
 
-        LocalShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+        LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
 
         assertTrue("Found path contains " + found.getPath().path().toString(),
             found.getPath().path().toString().contains("member-1-shard-default-config"));
@@ -1014,7 +1066,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
     }
 
     @Test
@@ -1031,7 +1083,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
             new Timeout(5, TimeUnit.SECONDS));
 
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
         Object resp = Await.result(future, kit.duration("5 seconds"));
         assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
@@ -1044,15 +1096,13 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         TestShardManager shardManager = newTestShardManager();
 
         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
-        shardManager.onReceiveCommand(new RoleChangeNotification(
+        shardManager.handleCommand(new RoleChangeNotification(
                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
+        assertFalse(ready.isDone());
 
-        verify(ready, never()).countDown();
-
-        shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
+        shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
-
-        verify(ready, times(1)).countDown();
+        assertTrue(ready.isDone());
     }
 
     @Test
@@ -1061,17 +1111,15 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         TestShardManager shardManager = newTestShardManager();
 
         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
-        shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
+        shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
+        assertFalse(ready.isDone());
 
-        verify(ready, never()).countDown();
+        shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
 
-        shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
-
-        shardManager.onReceiveCommand(
+        shardManager.handleCommand(
             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
-
-        verify(ready, times(1)).countDown();
+        assertTrue(ready.isDone());
     }
 
     @Test
@@ -1080,27 +1128,24 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         TestShardManager shardManager = newTestShardManager();
 
         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
-        shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
+        shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
+        assertFalse(ready.isDone());
 
-        verify(ready, never()).countDown();
-
-        shardManager.onReceiveCommand(
+        shardManager.handleCommand(
             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
 
-        shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
-
-        verify(ready, times(1)).countDown();
+        shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
+        assertTrue(ready.isDone());
     }
 
     @Test
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
         TestShardManager shardManager = newTestShardManager();
 
-        shardManager.onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
+        shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
             RaftState.Leader.name()));
-
-        verify(ready, never()).countDown();
+        assertFalse(ready.isDone());
     }
 
     @Test
@@ -1114,7 +1159,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
         TestShardManager shardManager = newTestShardManager();
 
-        shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+        shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         assertTrue(shardManager.getMBean().getSyncStatus());
@@ -1125,13 +1170,13 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         TestShardManager shardManager = newTestShardManager();
 
         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
-        shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
+        shardManager.handleCommand(new RoleChangeNotification(shardId,
                 RaftState.Follower.name(), RaftState.Candidate.name()));
 
         assertFalse(shardManager.getMBean().getSyncStatus());
 
         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
-        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
+        shardManager.handleCommand(new FollowerInitialSyncUpStatus(
                 true, shardId));
 
         assertFalse(shardManager.getMBean().getSyncStatus());
@@ -1142,19 +1187,19 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         TestShardManager shardManager = newTestShardManager();
 
         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
-        shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
+        shardManager.handleCommand(new RoleChangeNotification(shardId,
                 RaftState.Candidate.name(), RaftState.Follower.name()));
 
         // Initially will be false
         assertFalse(shardManager.getMBean().getSyncStatus());
 
         // Send status true will make sync status true
-        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
+        shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
 
         assertTrue(shardManager.getMBean().getSyncStatus());
 
         // Send status false will make sync status false
-        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
+        shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
 
         assertFalse(shardManager.getMBean().getSyncStatus());
     }
@@ -1174,7 +1219,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         // Make default shard leader
         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
-        shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
+        shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // default = Leader, astronauts is unknown so sync status remains false
@@ -1182,21 +1227,21 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         // Make astronauts shard leader as well
         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
-        shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+        shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // Now sync status should be true
         assertTrue(shardManager.getMBean().getSyncStatus());
 
         // Make astronauts a Follower
-        shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+        shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
                 RaftState.Leader.name(), RaftState.Follower.name()));
 
         // Sync status is not true
         assertFalse(shardManager.getMBean().getSyncStatus());
 
         // Make the astronauts follower sync status true
-        shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
+        shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
 
         // Sync status is now true
         assertTrue(shardManager.getMBean().getSyncStatus());
@@ -1210,7 +1255,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
         shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
 
@@ -1235,22 +1280,22 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
 
-        SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
+        EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
 
         DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
                 .persistent(false).build();
         Shard.Builder shardBuilder = Shard.builder();
 
-        ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+        ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
             "foo", null, members("member-1", "member-5", "member-6"));
         shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
 
         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
 
         assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
         assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
@@ -1268,7 +1313,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
 
-        Success success = kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+        Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
         assertNotNull("Success status is null", success.status());
 
         LOG.info("testOnCreateShard ending");
@@ -1287,14 +1332,14 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
 
         Shard.Builder shardBuilder = Shard.builder();
-        ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+        ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
             "foo", null, members("member-5", "member-6"));
 
         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
 
         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
 
         assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
         assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
@@ -1313,18 +1358,18 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         Shard.Builder shardBuilder = Shard.builder();
 
-        ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+        ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
             "foo", null, members("member-1"));
         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
 
-        SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
+        EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
 
         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
 
-        kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
 
         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
         assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
@@ -1360,10 +1405,8 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
 
-        Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName;
-
         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
-            datastoreSnapshot.getShardSnapshots().stream().map(shardNameTransformer).collect(Collectors.toSet())));
+            datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet())));
 
         // Add a new replica
 
@@ -1384,7 +1427,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
 
         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
-                Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
+                Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName)));
 
         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
         assertNotNull("Expected ShardManagerSnapshot", snapshot);
@@ -1407,7 +1450,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 .put("astronauts", Collections.<String>emptyList()).build());
 
         ShardManagerSnapshot snapshot =
-                new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
+                new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
                 Collections.<ShardSnapshot>emptyList());
         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
@@ -1442,7 +1485,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
 
         shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
-        Status.Failure resp = kit.expectMsgClass(kit.duration("2 seconds"), Status.Failure.class);
+        Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
 
         assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
     }
@@ -1488,7 +1531,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
 
-        leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
+        leaderShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender());
 
         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
         leaderShardManager.tell(
@@ -1505,7 +1548,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         // persisted.
         String[] restoredShards = { "default", "people" };
         ShardManagerSnapshot snapshot =
-                new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
+                new ShardManagerSnapshot(Arrays.asList(restoredShards));
         InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
 
@@ -1518,7 +1561,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             AddServer.class);
         String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
         assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
-        kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
 
         InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
         InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
@@ -1539,7 +1582,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
 
         String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
         AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
@@ -1560,17 +1603,17 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
 
-        Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+        Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
 
         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
 
         // Send message again to verify previous in progress state is
         // cleared
 
         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
-        resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+        resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
 
         // Send message again with an AddServer timeout to verify the
@@ -1581,10 +1624,10 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
         leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
 
         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
 
         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
     }
@@ -1597,7 +1640,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), mockShardActor);
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
             DataStoreVersions.CURRENT_VERSION), kit.getRef());
         shardManager.tell(
@@ -1605,11 +1648,11 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             mockShardActor);
 
         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
-        Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+        Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
 
         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
 
         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
     }
@@ -1641,18 +1684,18 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             addServerMsg.getNewServerId());
         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
 
-        Failure failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+        Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
         assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
 
         shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
 
         terminateWatcher.expectTerminated(mockNewReplicaShardActor);
 
         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
         mockShardLeaderKit.expectMsgClass(AddServer.class);
         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
-        failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+        failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
         assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
 
         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
@@ -1680,7 +1723,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
 
         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
-        Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
+        Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
         assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
 
         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
@@ -1694,7 +1737,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
 
         shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
-        Status.Failure resp = kit.expectMsgClass(kit.duration("10 seconds"), Status.Failure.class);
+        Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
         assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
     }
 
@@ -1712,7 +1755,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), respondActor);
+        shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender());
         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
             DataStoreVersions.CURRENT_VERSION), kit.getRef());
         shardManager.tell(
@@ -1724,7 +1767,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             RemoveServer.class);
         assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
             removeServer.getServerId());
-        kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
     }
 
     @Test
@@ -1784,8 +1827,8 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
 
-        leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
-        newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
+        leaderShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender());
+        newReplicaShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender());
 
         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
         leaderShardManager.tell(
@@ -1812,7 +1855,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             RemoveServer.class);
         String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
         assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
-        kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
     }
 
     @Test
@@ -1854,7 +1897,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.tell(secondServerChange, secondRequestKit.getRef());
 
-        secondRequestKit.expectMsgClass(secondRequestKit.duration("5 seconds"), Failure.class);
+        secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
     }
 
     @Test
@@ -1871,10 +1914,10 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.underlyingActor().waitForRecoveryComplete();
         shardManager.tell(new FindLocalShard("people", false), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
 
         shardManager.tell(new FindLocalShard("default", false), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
 
         // Removed the default shard replica from member-1
         ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
@@ -1906,7 +1949,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         shardManager.underlyingActor().waitForRecoveryComplete();
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), shard);
+        shardManager.tell(new ActorInitialized(shard), ActorRef.noSender());
 
         waitForShardInitialized(shardManager, "people", kit);
         waitForShardInitialized(shardManager, "default", kit);
@@ -1932,7 +1975,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                     .put("people", Arrays.asList("member-1", "member-2")).build());
         String[] restoredShards = {"default", "astronauts"};
         ShardManagerSnapshot snapshot =
-                new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
+                new ShardManagerSnapshot(Arrays.asList(restoredShards));
         InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
 
         // create shardManager to come up with restored data
@@ -1942,7 +1985,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
 
         newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
-        LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
+        LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
         assertEquals("for uninitialized shard", "people", notFound.getShardName());
 
         // Verify a local shard is created for the restored shards,
@@ -1950,10 +1993,10 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         // as the actor initialization
         // message is not sent for them
         newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
 
         newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
-        kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
 
         LOG.info("testShardPersistenceWithRestoredData ending");
     }
@@ -1975,8 +2018,8 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), shard1);
-        shardManager.tell(new ActorInitialized(), shard2);
+        shardManager.tell(new ActorInitialized(shard1), ActorRef.noSender());
+        shardManager.tell(new ActorInitialized(shard2), ActorRef.noSender());
 
         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
         Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
@@ -2012,7 +2055,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), respondActor);
+        shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender());
         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
             DataStoreVersions.CURRENT_VERSION), kit.getRef());
         shardManager.tell(
@@ -2029,7 +2072,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
                 Boolean.TRUE));
 
-        kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+        kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
     }
 
     @Test
@@ -2044,7 +2087,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
 
         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
-        shardManager.tell(new ActorInitialized(), respondActor);
+        shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender());
         shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
 
         shardManager.tell(
@@ -2052,10 +2095,65 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
 
-        Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
+        Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
         assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testRegisterForShardLeaderChanges() {
+        LOG.info("testRegisterForShardLeaderChanges starting");
+
+        final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+        final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+        final TestKit kit = new TestKit(getSystem());
+        final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
+
+        shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
+        shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender());
+
+        final Consumer<String> mockCallback = mock(Consumer.class);
+        shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
+
+        final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
+        final Registration reg = (Registration) reply.status();
+
+        final DataTree mockDataTree = mock(DataTree.class);
+        shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+            DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+        verify(mockCallback, timeout(5000)).accept("default");
+
+        reset(mockCallback);
+        shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+                DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verifyNoMoreInteractions(mockCallback);
+
+        shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
+                DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+        verify(mockCallback, timeout(5000)).accept("default");
+
+        reset(mockCallback);
+        shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
+                DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+        verify(mockCallback, timeout(5000)).accept("default");
+
+        reset(mockCallback);
+        reg.close();
+
+        shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+                DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verifyNoMoreInteractions(mockCallback);
+
+        LOG.info("testRegisterForShardLeaderChanges ending");
+    }
+
     public static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
@@ -2168,7 +2266,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
             }
 
             Builder shardActor(final ActorRef newShardActor) {
-                this.shardActor = newShardActor;
+                shardActor = newShardActor;
                 return this;
             }
 
@@ -2211,7 +2309,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         AbstractGenericCreator(final Class<C> shardManagerClass) {
             this.shardManagerClass = shardManagerClass;
-            cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
+            cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
         }