Implement suspend leader in Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index f097c19e512a3766a3d836c5cc7e061862b29666..ae751fa65d2b0a3a0917e672a1dfeab59f52a670 100644 (file)
@@ -49,6 +49,7 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -60,7 +61,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
@@ -95,6 +95,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -279,7 +280,7 @@ public class ShardTest extends AbstractShardTest {
             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
 
-            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
+            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterDataTreeChangeListenerReply.class);
@@ -347,7 +348,7 @@ public class ShardTest extends AbstractShardTest {
             assertEquals("Got first ElectionTimeout", true,
                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
-            shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
+            shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                 RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
@@ -550,8 +551,8 @@ public class ShardTest extends AbstractShardTest {
         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
 
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-            SerializationUtils.serializeNormalizedNode(root),
-            Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+                SerializationUtils.serializeNormalizedNode(root),
+                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
         return testStore;
     }
 
@@ -678,11 +679,7 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.class));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -694,14 +691,10 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send the ForwardedReadyTransaction for the next 2 Tx's.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
@@ -1132,11 +1125,42 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
+    public void testTransactionMessagesWithNoLeader() {
+        new ShardTestKit(getSystem()) {{
+            dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
+                shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionMessagesWithNoLeader");
+
+            waitUntilNoLeader(shard);
+
+            shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
+            Failure failure = expectMsgClass(Failure.class);
+            assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+
+            shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
+                    DataStoreVersions.CURRENT_VERSION, true), getRef());
+            failure = expectMsgClass(Failure.class);
+            assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+
+            shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
+            failure = expectMsgClass(Failure.class);
+            assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+        }};
+    }
+
+    @Test
+    public void testReadyWithImmediateCommit() throws Exception{
+        testReadyWithImmediateCommit(true);
+        testReadyWithImmediateCommit(false);
+    }
+
+    public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testForwardedReadyTransactionWithImmediateCommit");
+                    "testReadyWithImmediateCommit-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1150,11 +1174,7 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
 
             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
 
@@ -1250,11 +1270,16 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCommitWithPersistenceDisabled() throws Throwable {
+        testCommitWithPersistenceDisabled(true);
+        testCommitWithPersistenceDisabled(false);
+    }
+
+    public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.persistent(false);
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCommitWithPersistenceDisabled");
+                    "testCommitWithPersistenceDisabled-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1270,11 +1295,7 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1321,14 +1342,19 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testCommitWhenTransactionHasNoModifications(){
+    public void testCommitWhenTransactionHasNoModifications() {
+        testCommitWhenTransactionHasNoModifications(true);
+        testCommitWhenTransactionHasNoModifications(false);
+    }
+
+    public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
         // Note that persistence is enabled which would normally result in the entry getting written to the journal
         // but here that need not happen
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testCommitWhenTransactionHasNoModifications");
+                        "testCommitWhenTransactionHasNoModifications-" + readWrite);
 
                 waitUntilLeader(shard);
 
@@ -1342,11 +1368,7 @@ public class ShardTest extends AbstractShardTest {
 
                 final FiniteDuration duration = duration("5 seconds");
 
-                // Simulate the ForwardedReadyTransaction messages that would be sent
-                // by the ShardTransaction.
-
-                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                        cohort, modification, true, false), getRef());
+                shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
                 expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
@@ -1381,12 +1403,17 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testCommitWhenTransactionHasModifications(){
+    public void testCommitWhenTransactionHasModifications() {
+        testCommitWhenTransactionHasModifications(true);
+        testCommitWhenTransactionHasModifications(false);
+    }
+
+    public void testCommitWhenTransactionHasModifications(final boolean readWrite){
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testCommitWhenTransactionHasModifications");
+                        "testCommitWhenTransactionHasModifications-" + readWrite);
 
                 waitUntilLeader(shard);
 
@@ -1401,11 +1428,7 @@ public class ShardTest extends AbstractShardTest {
 
                 final FiniteDuration duration = duration("5 seconds");
 
-                // Simulate the ForwardedReadyTransaction messages that would be sent
-                // by the ShardTransaction.
-
-                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                        cohort, modification, true, false), getRef());
+                shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
                 expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
@@ -1441,10 +1464,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCommitPhaseFailure() throws Throwable {
+        testCommitPhaseFailure(true);
+        testCommitPhaseFailure(false);
+    }
+
+    public void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCommitPhaseFailure");
+                    "testCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1467,15 +1495,10 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1521,10 +1544,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testPreCommitPhaseFailure() throws Throwable {
+        testPreCommitPhaseFailure(true);
+        testPreCommitPhaseFailure(false);
+    }
+
+    public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testPreCommitPhaseFailure");
+                    "testPreCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1542,15 +1570,10 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1595,10 +1618,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCanCommitPhaseFailure() throws Throwable {
+        testCanCommitPhaseFailure(true);
+        testCanCommitPhaseFailure(false);
+    }
+
+    public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCanCommitPhaseFailure");
+                    "testCanCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1609,11 +1637,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1628,8 +1652,7 @@ public class ShardTest extends AbstractShardTest {
             final String transactionID2 = "tx2";
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1643,10 +1666,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCanCommitPhaseFalseResponse() throws Throwable {
+        testCanCommitPhaseFalseResponse(true);
+        testCanCommitPhaseFalseResponse(false);
+    }
+
+    public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCanCommitPhaseFalseResponse");
+                    "testCanCommitPhaseFalseResponse-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1657,11 +1685,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1678,8 +1702,7 @@ public class ShardTest extends AbstractShardTest {
             final String transactionID2 = "tx2";
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1693,10 +1716,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
+        testImmediateCommitWithCanCommitPhaseFailure(true);
+        testImmediateCommitWithCanCommitPhaseFailure(false);
+    }
+
+    public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testImmediateCommitWithCanCommitPhaseFailure");
+                    "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1707,11 +1735,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
 
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
@@ -1729,8 +1753,7 @@ public class ShardTest extends AbstractShardTest {
             doReturn(candidateRoot).when(candidate).getRootNode();
             doReturn(candidate).when(cohort).getCandidate();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1740,10 +1763,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
+        testImmediateCommitWithCanCommitPhaseFalseResponse(true);
+        testImmediateCommitWithCanCommitPhaseFalseResponse(false);
+    }
+
+    public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testImmediateCommitWithCanCommitPhaseFalseResponse");
+                    "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1754,11 +1782,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
 
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
@@ -1776,8 +1800,7 @@ public class ShardTest extends AbstractShardTest {
             doReturn(candidateRoot).when(candidate).getRootNode();
             doReturn(candidate).when(cohort).getCandidate();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1787,10 +1810,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testAbortBeforeFinishCommit() throws Throwable {
+        testAbortBeforeFinishCommit(true);
+        testAbortBeforeFinishCommit(false);
+    }
+
+    public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortBeforeFinishCommit");
+                    "testAbortBeforeFinishCommit-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1823,8 +1851,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
@@ -1848,12 +1875,17 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testTransactionCommitTimeout() throws Throwable {
+        testTransactionCommitTimeout(true);
+        testTransactionCommitTimeout(false);
+    }
+
+    public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testTransactionCommitTimeout");
+                    "testTransactionCommitTimeout-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1888,12 +1920,10 @@ public class ShardTest extends AbstractShardTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
@@ -1957,18 +1987,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // The 3rd Tx should exceed queue capacity and fail.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // canCommit 1st Tx.
@@ -2009,8 +2036,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final String transactionID2 = "tx2";
@@ -2018,8 +2044,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final String transactionID3 = "tx3";
@@ -2027,8 +2052,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                cohort3, modification3, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
@@ -2061,8 +2085,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // CanCommit the first one so it's the current in-progress CohortEntry.
@@ -2077,8 +2100,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Ready the third Tx.
@@ -2125,10 +2147,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testAbortCurrentTransaction() throws Throwable {
+        testAbortCurrentTransaction(true);
+        testAbortCurrentTransaction(false);
+    }
+
+    public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortCurrentTransaction");
+                    "testAbortCurrentTransaction-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -2148,15 +2175,10 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -2192,6 +2214,11 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testAbortQueuedTransaction() throws Throwable {
+        testAbortQueuedTransaction(true);
+        testAbortQueuedTransaction(false);
+    }
+
+    public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
@@ -2215,7 +2242,7 @@ public class ShardTest extends AbstractShardTest {
 
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
-                            Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+                            Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -2229,8 +2256,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Ready the tx.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
@@ -2517,146 +2543,150 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+    public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
-            dataStoreContextBuilder.persistent(false);
-            final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
-            final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
-            final Creator<Shard> creator = new Creator<Shard>() {
-                private static final long serialVersionUID = 1L;
-                boolean firstElectionTimeout = true;
-
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(newShardBuilder()) {
-                        @Override
-                        public void onReceiveCommand(final Object message) throws Exception {
-                            if(message instanceof ElectionTimeout && firstElectionTimeout) {
-                                firstElectionTimeout = false;
-                                final ActorRef self = getSelf();
-                                new Thread() {
-                                    @Override
-                                    public void run() {
-                                        Uninterruptibles.awaitUninterruptibly(
-                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
-                                        self.tell(message, self);
-                                    }
-                                }.start();
-
-                                onFirstElectionTimeout.countDown();
-                            } else {
-                                super.onReceiveCommand(message);
-                            }
-                        }
-                    };
-                }
-            };
+            String testName = "testClusteredDataChangeListenerDelayedRegistration";
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
 
             final MockDataChangeListener listener = new MockDataChangeListener(1);
-            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
-                "testDataChangeListenerOnFollower-DataChangeListener");
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                    actorFactory.generateActorId(testName + "-DataChangeListener"));
 
-            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
-                    withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    actorFactory.generateActorId(testName + "-shard"));
 
-            assertEquals("Got first ElectionTimeout", true,
-                onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
-            shard.tell(new FindLeader(), getRef());
-            final FindLeaderReply findLeadeReply =
-                expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+            waitUntilNoLeader(shard);
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
 
             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                 RegisterChangeListenerReply.class);
-            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-            onChangeListenerRegistered.countDown();
+            shard.tell(new ElectionTimeout(), ActorRef.noSender());
 
             listener.waitForChangeEvents();
-
-            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
-    public void testClusteredDataChangeListernerRegistration() throws Exception {
-        dataStoreContextBuilder.persistent(false).build();
+    public void testClusteredDataChangeListenerRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
-            final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
-                .shardName("inventory").type("config").build();
+            String testName = "testClusteredDataChangeListenerRegistration";
+            final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+            final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+            final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+                    Shard.builder().id(followerShardID).
+                        datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+                        peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+                            "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+            final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+                    Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+                        peerAddresses(Collections.singletonMap(followerShardID.toString(),
+                            "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+            leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+            String leaderPath = waitUntilLeader(followerShard);
+            assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
 
-            final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
-                .shardName("inventory").type("config").build();
-            final Creator<Shard> followerShardCreator = new Creator<Shard>() {
-                private static final long serialVersionUID = 1L;
-
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(Shard.builder().id(member1ShardID).datastoreContext(newDatastoreContext()).
-                            peerAddresses(Collections.singletonMap(member2ShardID.toString(),
-                                    "akka://test/user/" + member2ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {
-                        @Override
-                        public void onReceiveCommand(final Object message) throws Exception {
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                    actorFactory.generateActorId(testName + "-DataChangeListener"));
 
-                            if(!(message instanceof ElectionTimeout)) {
-                                super.onReceiveCommand(message);
-                            }
-                        }
-                    };
-                }
-            };
+            followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
-            final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
-                private static final long serialVersionUID = 1L;
+            writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(Shard.builder().id(member2ShardID).datastoreContext(newDatastoreContext()).
-                            peerAddresses(Collections.singletonMap(member1ShardID.toString(),
-                                    "akka://test/user/" + member1ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {};
-                }
-            };
+            listener.waitForChangeEvents();
+        }};
+    }
 
+    @Test
+    public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
 
-            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(followerShardCreator)),
-                member1ShardID.toString());
+            final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                    actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
-            final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
-                member2ShardID.toString());
-            // Sleep to let election happen
-            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    actorFactory.generateActorId(testName + "-shard"));
 
-            shard.tell(new FindLeader(), getRef());
-            final FindLeaderReply findLeaderReply =
-                    expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+            waitUntilNoLeader(shard);
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
-            final MockDataChangeListener listener = new MockDataChangeListener(1);
-            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
-                "testDataChangeListenerOnFollower-DataChangeListener");
 
-            shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
-            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                RegisterChangeListenerReply.class);
-            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+            final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                    RegisterDataTreeChangeListenerReply.class);
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
+            shard.tell(new ElectionTimeout(), ActorRef.noSender());
+
             listener.waitForChangeEvents();
+        }};
+    }
 
-            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    @Test
+    public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            String testName = "testClusteredDataTreeChangeListenerRegistration";
+            final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+            final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+            final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+                    Shard.builder().id(followerShardID).
+                        datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+                        peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+                            "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+            final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+                    Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+                        peerAddresses(Collections.singletonMap(followerShardID.toString(),
+                            "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+            leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+            String leaderPath = waitUntilLeader(followerShard);
+            assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                    actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+            followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+            final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                    RegisterDataTreeChangeListenerReply.class);
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+            writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            listener.waitForChangeEvents();
         }};
     }