Bug 4774: Wait for prior RO tx creates on tx chain
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index d9df305453073ec515f7427c783af21886f800e2..25e37edf714cc8c6b2c6a6cf33ccc47b854c287f 100644 (file)
@@ -40,7 +40,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -61,7 +60,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
@@ -95,6 +93,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -120,10 +119,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -148,7 +147,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListener-DataChangeListener");
 
             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                    dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
+                    dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
 
             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterChangeListenerReply.class);
@@ -190,8 +189,7 @@ public class ShardTest extends AbstractShardTest {
                     // this will cause all other messages to not be queued properly after that.
                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
                     // it does do a persist)
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                    return new Shard(newShardBuilder()) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -228,7 +226,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
 
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    Props.create(new DelegatingShardCreator(creator)),
+                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testRegisterChangeListenerWhenNotLeaderInitially");
 
             // Write initial data into the in-memory store.
@@ -241,7 +239,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Now send the RegisterChangeListener and wait for the reply.
             shard.tell(new RegisterChangeListener(path, dclActor,
-                    AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+                    AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
 
             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                     RegisterChangeListenerReply.class);
@@ -280,7 +278,7 @@ public class ShardTest extends AbstractShardTest {
             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
 
-            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
+            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterDataTreeChangeListenerReply.class);
@@ -309,8 +307,8 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                    return new Shard(Shard.builder().id(shardID).datastoreContext(
+                            dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -339,18 +337,18 @@ public class ShardTest extends AbstractShardTest {
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
 
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    Props.create(new DelegatingShardCreator(creator)),
+                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             assertEquals("Got first ElectionTimeout", true,
-                    onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+                onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
-            shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
+            shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                    RegisterDataTreeChangeListenerReply.class);
+                RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
             shard.tell(new FindLeader(), getRef());
@@ -422,12 +420,13 @@ public class ShardTest extends AbstractShardTest {
             final CountDownLatch recoveryComplete = new CountDownLatch(1);
             class TestShard extends Shard {
                 TestShard() {
-                    super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
-                            newDatastoreContext(), SCHEMA_CONTEXT);
+                    super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
+                            peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
+                            schemaContext(SCHEMA_CONTEXT));
                 }
 
-                Map<String, String> getPeerAddresses() {
-                    return getRaftActorContext().getPeerAddresses();
+                String getPeerAddress(String id) {
+                    return getRaftActorContext().getPeerAddress(id);
                 }
 
                 @Override
@@ -448,15 +447,14 @@ public class ShardTest extends AbstractShardTest {
                         }
                     })), "testPeerAddressResolved");
 
-            //waitUntilLeader(shard);
             assertEquals("Recovery complete", true,
-                    Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+                Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
 
             final String address = "akka://foobar";
             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
 
-            assertEquals("getPeerAddresses", address,
-                    ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+            assertEquals("getPeerAddress", address,
+                ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -472,7 +470,7 @@ public class ShardTest extends AbstractShardTest {
 
         testkit.waitUntilLeader(shard);
 
-        final DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         store.setSchemaContext(SCHEMA_CONTEXT);
 
         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
@@ -543,7 +541,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
-        final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+        final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         testStore.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -594,7 +592,7 @@ public class ShardTest extends AbstractShardTest {
         }
 
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
-                new ApplyJournalEntries(nListEntries));
+            new ApplyJournalEntries(nListEntries));
 
         testRecovery(listEntryKeys);
     }
@@ -610,8 +608,8 @@ public class ShardTest extends AbstractShardTest {
         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
 
         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
-                  new WriteModification(TestModel.OUTER_LIST_PATH,
-                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+            new WriteModification(TestModel.OUTER_LIST_PATH,
+                ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
         final int nListEntries = 16;
         final Set<Integer> listEntryKeys = new HashSet<>();
@@ -679,11 +677,7 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.class));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -695,14 +689,10 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send the ForwardedReadyTransaction for the next 2 Tx's.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
@@ -936,12 +926,12 @@ public class ShardTest extends AbstractShardTest {
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
+                ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
+                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1100,8 +1090,7 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
+                    return new Shard(newShardBuilder()) {
                         @Override
                         protected boolean isLeader() {
                             return overrideLeaderCalls.get() ? false : super.isLeader();
@@ -1134,11 +1123,16 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
+    public void testReadyWithImmediateCommit() throws Exception{
+        testReadyWithImmediateCommit(true);
+        testReadyWithImmediateCommit(false);
+    }
+
+    public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testForwardedReadyTransactionWithImmediateCommit");
+                    "testReadyWithImmediateCommit-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1152,11 +1146,7 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
 
             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
 
@@ -1183,7 +1173,7 @@ public class ShardTest extends AbstractShardTest {
 
             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
-            final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+            final DataTreeModification modification = dataStore.newModification();
 
             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
@@ -1216,7 +1206,7 @@ public class ShardTest extends AbstractShardTest {
 
             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
-            final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+            final DataTreeModification modification = dataStore.newModification();
 
             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
@@ -1252,11 +1242,16 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCommitWithPersistenceDisabled() throws Throwable {
+        testCommitWithPersistenceDisabled(true);
+        testCommitWithPersistenceDisabled(false);
+    }
+
+    public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.persistent(false);
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCommitWithPersistenceDisabled");
+                    "testCommitWithPersistenceDisabled-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1268,15 +1263,11 @@ public class ShardTest extends AbstractShardTest {
             final MutableCompositeModification modification = new MutableCompositeModification();
             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
-                    TestModel.TEST_PATH, containerNode, modification);
+                TestModel.TEST_PATH, containerNode, modification);
 
             final FiniteDuration duration = duration("5 seconds");
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1323,14 +1314,19 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testCommitWhenTransactionHasNoModifications(){
+    public void testCommitWhenTransactionHasNoModifications() {
+        testCommitWhenTransactionHasNoModifications(true);
+        testCommitWhenTransactionHasNoModifications(false);
+    }
+
+    public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
         // Note that persistence is enabled which would normally result in the entry getting written to the journal
         // but here that need not happen
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testCommitWhenTransactionHasNoModifications");
+                        "testCommitWhenTransactionHasNoModifications-" + readWrite);
 
                 waitUntilLeader(shard);
 
@@ -1344,11 +1340,7 @@ public class ShardTest extends AbstractShardTest {
 
                 final FiniteDuration duration = duration("5 seconds");
 
-                // Simulate the ForwardedReadyTransaction messages that would be sent
-                // by the ShardTransaction.
-
-                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                        cohort, modification, true, false), getRef());
+                shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
                 expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
@@ -1383,12 +1375,17 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testCommitWhenTransactionHasModifications(){
+    public void testCommitWhenTransactionHasModifications() {
+        testCommitWhenTransactionHasModifications(true);
+        testCommitWhenTransactionHasModifications(false);
+    }
+
+    public void testCommitWhenTransactionHasModifications(final boolean readWrite){
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testCommitWhenTransactionHasModifications");
+                        "testCommitWhenTransactionHasModifications-" + readWrite);
 
                 waitUntilLeader(shard);
 
@@ -1403,11 +1400,7 @@ public class ShardTest extends AbstractShardTest {
 
                 final FiniteDuration duration = duration("5 seconds");
 
-                // Simulate the ForwardedReadyTransaction messages that would be sent
-                // by the ShardTransaction.
-
-                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                        cohort, modification, true, false), getRef());
+                shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
                 expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
@@ -1443,10 +1436,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCommitPhaseFailure() throws Throwable {
+        testCommitPhaseFailure(true);
+        testCommitPhaseFailure(false);
+    }
+
+    public void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCommitPhaseFailure");
+                    "testCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1469,15 +1467,10 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1523,10 +1516,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testPreCommitPhaseFailure() throws Throwable {
+        testPreCommitPhaseFailure(true);
+        testPreCommitPhaseFailure(false);
+    }
+
+    public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testPreCommitPhaseFailure");
+                    "testPreCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1544,22 +1542,17 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
@@ -1597,10 +1590,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCanCommitPhaseFailure() throws Throwable {
+        testCanCommitPhaseFailure(true);
+        testCanCommitPhaseFailure(false);
+    }
+
+    public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCanCommitPhaseFailure");
+                    "testCanCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1611,11 +1609,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1630,8 +1624,7 @@ public class ShardTest extends AbstractShardTest {
             final String transactionID2 = "tx2";
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1645,10 +1638,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCanCommitPhaseFalseResponse() throws Throwable {
+        testCanCommitPhaseFalseResponse(true);
+        testCanCommitPhaseFalseResponse(false);
+    }
+
+    public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCanCommitPhaseFalseResponse");
+                    "testCanCommitPhaseFalseResponse-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1659,11 +1657,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1680,8 +1674,7 @@ public class ShardTest extends AbstractShardTest {
             final String transactionID2 = "tx2";
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1695,10 +1688,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
+        testImmediateCommitWithCanCommitPhaseFailure(true);
+        testImmediateCommitWithCanCommitPhaseFailure(false);
+    }
+
+    public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testImmediateCommitWithCanCommitPhaseFailure");
+                    "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1709,11 +1707,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
 
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
@@ -1731,8 +1725,7 @@ public class ShardTest extends AbstractShardTest {
             doReturn(candidateRoot).when(candidate).getRootNode();
             doReturn(candidate).when(cohort).getCandidate();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1742,10 +1735,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
+        testImmediateCommitWithCanCommitPhaseFalseResponse(true);
+        testImmediateCommitWithCanCommitPhaseFalseResponse(false);
+    }
+
+    public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testImmediateCommitWithCanCommitPhaseFalseResponse");
+                    "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1756,11 +1754,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
 
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
@@ -1778,8 +1772,7 @@ public class ShardTest extends AbstractShardTest {
             doReturn(candidateRoot).when(candidate).getRootNode();
             doReturn(candidate).when(cohort).getCandidate();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1789,10 +1782,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testAbortBeforeFinishCommit() throws Throwable {
+        testAbortBeforeFinishCommit(true);
+        testAbortBeforeFinishCommit(false);
+    }
+
+    public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortBeforeFinishCommit");
+                    "testAbortBeforeFinishCommit-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1825,13 +1823,12 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
@@ -1850,12 +1847,17 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testTransactionCommitTimeout() throws Throwable {
+        testTransactionCommitTimeout(true);
+        testTransactionCommitTimeout(false);
+    }
+
+    public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testTransactionCommitTimeout");
+                    "testTransactionCommitTimeout-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1890,12 +1892,10 @@ public class ShardTest extends AbstractShardTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
@@ -1959,18 +1959,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // The 3rd Tx should exceed queue capacity and fail.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // canCommit 1st Tx.
@@ -2011,8 +2008,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final String transactionID2 = "tx2";
@@ -2020,8 +2016,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final String transactionID3 = "tx3";
@@ -2029,8 +2024,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
@@ -2063,8 +2057,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // CanCommit the first one so it's the current in-progress CohortEntry.
@@ -2079,14 +2072,13 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Ready the third Tx.
 
             final String transactionID3 = "tx3";
-            final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
+            final DataTreeModification modification3 = dataStore.newModification();
             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
                     .apply(modification3);
                 modification3.ready();
@@ -2127,10 +2119,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testAbortCurrentTransaction() throws Throwable {
+        testAbortCurrentTransaction(true);
+        testAbortCurrentTransaction(false);
+    }
+
+    public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortCurrentTransaction");
+                    "testAbortCurrentTransaction-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -2150,15 +2147,10 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -2194,6 +2186,11 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testAbortQueuedTransaction() throws Throwable {
+        testAbortQueuedTransaction(true);
+        testAbortQueuedTransaction(false);
+    }
+
+    public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
@@ -2201,8 +2198,7 @@ public class ShardTest extends AbstractShardTest {
             final Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+                    return new Shard(newShardBuilder()) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             super.onReceiveCommand(message);
@@ -2218,7 +2214,7 @@ public class ShardTest extends AbstractShardTest {
 
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
-                            Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+                            Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -2232,8 +2228,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Ready the tx.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
@@ -2297,9 +2292,8 @@ public class ShardTest extends AbstractShardTest {
         new ShardTestKit(getSystem()) {{
             class TestShard extends Shard {
 
-                protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
-                                    final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-                    super(name, peerAddresses, datastoreContext, schemaContext);
+                protected TestShard(AbstractBuilder<?, ?> builder) {
+                    super(builder);
                     setPersistence(new TestPersistentDataProvider(super.persistence()));
                 }
 
@@ -2321,8 +2315,7 @@ public class ShardTest extends AbstractShardTest {
             final Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new TestShard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT);
+                    return new TestShard(newShardBuilder());
                 }
             };
 
@@ -2330,7 +2323,6 @@ public class ShardTest extends AbstractShardTest {
                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
 
             waitUntilLeader(shard);
-
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
@@ -2338,35 +2330,35 @@ public class ShardTest extends AbstractShardTest {
             // Trigger creation of a snapshot by ensuring
             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
-
-            assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
-
-            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
-                    savedSnapshot.get() instanceof Snapshot);
-
-            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
-
-            latch.set(new CountDownLatch(1));
-            savedSnapshot.set(null);
+            awaitAndValidateSnapshot(expectedRoot);
 
             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
+            awaitAndValidateSnapshot(expectedRoot);
 
-            assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }
 
-            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
-                    savedSnapshot.get() instanceof Snapshot);
+            private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
+                                              ) throws InterruptedException {
+                System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
+                assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
-            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+                assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+                        savedSnapshot.get() instanceof Snapshot);
 
-            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-        }
+                verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
 
-        private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
+                latch.set(new CountDownLatch(1));
+                savedSnapshot.set(null);
+            }
 
-            final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
-            assertEquals("Root node", expectedRoot, actual);
+            private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
 
-        }};
+                final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+                assertEquals("Root node", expectedRoot, actual);
+
+           }
+        };
     }
 
     /**
@@ -2376,7 +2368,7 @@ public class ShardTest extends AbstractShardTest {
      */
     @Test
     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
-        final DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         store.setSchemaContext(SCHEMA_CONTEXT);
 
         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
@@ -2405,14 +2397,14 @@ public class ShardTest extends AbstractShardTest {
         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
-        final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
-                persistentContext, SCHEMA_CONTEXT);
+        final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
+                schemaContext(SCHEMA_CONTEXT).props();
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
-        final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
-                nonPersistentContext, SCHEMA_CONTEXT);
+        final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
+                schemaContext(SCHEMA_CONTEXT).props();
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
@@ -2448,12 +2440,12 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", false,
-                    shard.underlyingActor().persistence().isRecoveryApplicable());
+                shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().persistence().isRecoveryApplicable());
+                shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -2477,11 +2469,11 @@ public class ShardTest extends AbstractShardTest {
                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
 
                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
-                        ShardLeaderStateChanged.class);
+                    ShardLeaderStateChanged.class);
                 assertEquals("getLocalShardDataTree present", true,
                         leaderStateChanged.getLocalShardDataTree().isPresent());
                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
-                        leaderStateChanged.getLocalShardDataTree().get());
+                    leaderStateChanged.getLocalShardDataTree().get());
 
                 MessageCollectorActor.clearMessages(listener);
 
@@ -2521,4 +2513,167 @@ public class ShardTest extends AbstractShardTest {
         store.validate(modification);
         store.commit(store.prepare(modification));
     }
+
+    @Test
+    public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            String testName = "testClusteredDataChangeListenerDelayedRegistration";
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
+
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                    actorFactory.generateActorId(testName + "-DataChangeListener"));
+
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    actorFactory.generateActorId(testName + "-shard"));
+
+            waitUntilNoLeader(shard);
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+            shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            shard.tell(new ElectionTimeout(), ActorRef.noSender());
+
+            listener.waitForChangeEvents();
+        }};
+    }
+
+    @Test
+    public void testClusteredDataChangeListenerRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            String testName = "testClusteredDataChangeListenerRegistration";
+            final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+            final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+            final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+                    Shard.builder().id(followerShardID).
+                        datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+                        peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+                            "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+            final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+                    Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+                        peerAddresses(Collections.singletonMap(followerShardID.toString(),
+                            "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+            leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+            String leaderPath = waitUntilLeader(followerShard);
+            assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                    actorFactory.generateActorId(testName + "-DataChangeListener"));
+
+            followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+            writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            listener.waitForChangeEvents();
+        }};
+    }
+
+    @Test
+    public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
+
+            final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                    actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    actorFactory.generateActorId(testName + "-shard"));
+
+            waitUntilNoLeader(shard);
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+            final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                    RegisterDataTreeChangeListenerReply.class);
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            shard.tell(new ElectionTimeout(), ActorRef.noSender());
+
+            listener.waitForChangeEvents();
+        }};
+    }
+
+    @Test
+    public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            String testName = "testClusteredDataTreeChangeListenerRegistration";
+            final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+            final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+            final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+                    Shard.builder().id(followerShardID).
+                        datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+                        peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+                            "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+            final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+                    Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+                        peerAddresses(Collections.singletonMap(followerShardID.toString(),
+                            "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+            leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+            String leaderPath = waitUntilLeader(followerShard);
+            assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                    actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+            followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+            final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                    RegisterDataTreeChangeListenerReply.class);
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+            writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            listener.waitForChangeEvents();
+        }};
+    }
+
+    @Test
+    public void testServerRemoved() throws Exception {
+        final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+        final ActorRef shard = parent.underlyingActor().context().actorOf(
+                newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                "testServerRemoved");
+
+        shard.tell(new ServerRemoved("test"), ActorRef.noSender());
+
+        MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
+
+    }
+
 }