Merge "BUG 2854 : Do not add empty read write transactions to the replicable journal"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 9b4f77b7c42be1c841ee6575acb5764d90e78d5e..a87000136fc52729d4e760dc2ffa86af84346392 100644 (file)
@@ -1,26 +1,44 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
+import akka.japi.Procedure;
 import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
+import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -29,40 +47,44 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -71,93 +93,150 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.inOrder;
-
-public class ShardTest extends AbstractActorTest {
 
-    private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
+public class ShardTest extends AbstractShardTest {
+    @Test
+    public void testRegisterChangeListener() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps(),  "testRegisterChangeListener");
 
-    private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
-            .shardName("inventory").type("config").build();
+            waitUntilLeader(shard);
 
-    private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
+            shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
 
-    private static String shardName() {
-        return "shard" + NEXT_SHARD_NUM.getAndIncrement();
-    }
+            MockDataChangeListener listener = new MockDataChangeListener(1);
+            ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+                    "testRegisterChangeListener-DataChangeListener");
 
-    private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
-            shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).build();
+            shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
+                    dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
 
-    @Before
-    public void setUp() {
-        System.setProperty("shard.persistent", "false");
+            RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
+                    RegisterChangeListenerReply.class);
+            String replyPath = reply.getListenerRegistrationPath().toString();
+            assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
+                    "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
 
-        InMemorySnapshotStore.clear();
-        InMemoryJournal.clear();
-    }
+            YangInstanceIdentifier path = TestModel.TEST_PATH;
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-    @After
-    public void tearDown() {
-        InMemorySnapshotStore.clear();
-        InMemoryJournal.clear();
-    }
+            listener.waitForChangeEvents(path);
 
-    private Props newShardProps() {
-        return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
-                dataStoreContext, SCHEMA_CONTEXT);
+            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
     }
 
+    @SuppressWarnings("serial")
     @Test
-    public void testOnReceiveRegisterListener() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
+    public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
+        // This test tests the timing window in which a change listener is registered before the
+        // shard becomes the leader. We verify that the listener is registered and notified of the
+        // existing data when the shard becomes the leader.
+        new ShardTestKit(getSystem()) {{
+            // For this test, we want to send the RegisterChangeListener message after the shard
+            // has recovered from persistence and before it becomes the leader. So we subclass
+            // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
+            // we know that the shard has been initialized to a follower and has started the
+            // election process. The following 2 CountDownLatches are used to coordinate the
+            // ElectionTimeout with the sending of the RegisterChangeListener message.
+            final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+            final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+            Creator<Shard> creator = new Creator<Shard>() {
+                boolean firstElectionTimeout = true;
 
-            subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                            newDatastoreContext(), SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) throws Exception {
+                            if(message instanceof ElectionTimeout && firstElectionTimeout) {
+                                // Got the first ElectionTimeout. We don't forward it to the
+                                // base Shard yet until we've sent the RegisterChangeListener
+                                // message. So we signal the onFirstElectionTimeout latch to tell
+                                // the main thread to send the RegisterChangeListener message and
+                                // start a thread to wait on the onChangeListenerRegistered latch,
+                                // which the main thread signals after it has sent the message.
+                                // After the onChangeListenerRegistered is triggered, we send the
+                                // original ElectionTimeout message to proceed with the election.
+                                firstElectionTimeout = false;
+                                final ActorRef self = getSelf();
+                                new Thread() {
+                                    @Override
+                                    public void run() {
+                                        Uninterruptibles.awaitUninterruptibly(
+                                                onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+                                        self.tell(message, self);
+                                    }
+                                }.start();
+
+                                onFirstElectionTimeout.countDown();
+                            } else {
+                                super.onReceiveCommand(message);
+                            }
+                        }
+                    };
+                }
+            };
 
-            subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                    getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+            MockDataChangeListener listener = new MockDataChangeListener(1);
+            ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+                    "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
 
-            EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
-            assertEquals("isEnabled", false, enable.isEnabled());
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    Props.create(new DelegatingShardCreator(creator)),
+                    "testRegisterChangeListenerWhenNotLeaderInitially");
 
-            RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
+            // Write initial data into the in-memory store.
+            YangInstanceIdentifier path = TestModel.TEST_PATH;
+            writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            // Wait until the shard receives the first ElectionTimeout message.
+            assertEquals("Got first ElectionTimeout", true,
+                    onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+            // Now send the RegisterChangeListener and wait for the reply.
+            shard.tell(new RegisterChangeListener(path, dclActor.path(),
+                    AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+
+            RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                     RegisterChangeListenerReply.class);
-            assertTrue(reply.getListenerRegistrationPath().toString().matches(
-                    "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+            // Sanity check - verify the shard is not the leader yet.
+            shard.tell(new FindLeader(), getRef());
+            FindLeaderReply findLeadeReply =
+                    expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+            // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
+            // with the election process.
+            onChangeListenerRegistered.countDown();
+
+            // Wait for the shard to become the leader and notify our listener with the existing
+            // data in the store.
+            listener.waitForChangeEvents(path);
+
+            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
     public void testCreateTransaction(){
         new ShardTestKit(getSystem()) {{
-            ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
+            ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
 
-            waitUntilLeader(subject);
+            waitUntilLeader(shard);
 
-            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            subject.tell(new CreateTransaction("txn-1",
+            shard.tell(new CreateTransaction("txn-1",
                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
 
             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
@@ -166,18 +245,19 @@ public class ShardTest extends AbstractActorTest {
             String path = reply.getTransactionActorPath().toString();
             assertTrue("Unexpected transaction path " + path,
                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
-            expectNoMsg();
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
     public void testCreateTransactionOnChain(){
         new ShardTestKit(getSystem()) {{
-            final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
+            final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
 
-            waitUntilLeader(subject);
+            waitUntilLeader(shard);
 
-            subject.tell(new CreateTransaction("txn-1",
+            shard.tell(new CreateTransaction("txn-1",
                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
                     getRef());
 
@@ -187,81 +267,102 @@ public class ShardTest extends AbstractActorTest {
             String path = reply.getTransactionActorPath().toString();
             assertTrue("Unexpected transaction path " + path,
                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
-            expectNoMsg();
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
+    @SuppressWarnings("serial")
     @Test
-    public void testPeerAddressResolved(){
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
-
-            Props props = Shard.props(identifier,
-                    Collections.<ShardIdentifier, String>singletonMap(identifier, null),
-                    dataStoreContext, SCHEMA_CONTEXT);
-            final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
-
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+    public void testPeerAddressResolved() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            final CountDownLatch recoveryComplete = new CountDownLatch(1);
+            class TestShard extends Shard {
+                TestShard() {
+                    super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
+                            newDatastoreContext(), SCHEMA_CONTEXT);
+                }
 
-                    subject.tell(
-                        new PeerAddressResolved(identifier, "akka://foobar"),
-                        getRef());
+                Map<String, String> getPeerAddresses() {
+                    return getRaftActorContext().getPeerAddresses();
+                }
 
-                    expectNoMsg();
+                @Override
+                protected void onRecoveryComplete() {
+                    try {
+                        super.onRecoveryComplete();
+                    } finally {
+                        recoveryComplete.countDown();
+                    }
                 }
-            };
+            }
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    Props.create(new DelegatingShardCreator(new Creator<Shard>() {
+                        @Override
+                        public TestShard create() throws Exception {
+                            return new TestShard();
+                        }
+                    })), "testPeerAddressResolved");
+
+            //waitUntilLeader(shard);
+            assertEquals("Recovery complete", true,
+                    Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+
+            String address = "akka://foobar";
+            shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
+
+            assertEquals("getPeerAddresses", address,
+                    ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
-    public void testApplySnapshot() throws ExecutionException, InterruptedException {
-        TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
+    public void testApplySnapshot() throws Exception {
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+                "testApplySnapshot");
 
-        NormalizedNodeToNodeCodec codec =
-            new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
-        writeToStore(ref, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
-        NormalizedNode<?,?> expected = readStore(ref, root);
-
-        NormalizedNodeMessages.Container encode = codec.encode(expected);
+        NormalizedNode<?,?> expected = readStore(store, root);
 
         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
-                encode.getNormalizedNode().toByteString().toByteArray(),
+                SerializationUtils.serializeNormalizedNode(expected),
                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
 
-        ref.underlyingActor().onReceiveCommand(applySnapshot);
+        shard.underlyingActor().onReceiveCommand(applySnapshot);
 
-        NormalizedNode<?,?> actual = readStore(ref, root);
+        NormalizedNode<?,?> actual = readStore(shard, root);
 
-        assertEquals(expected, actual);
+        assertEquals("Root node", expected, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
     @Test
     public void testApplyState() throws Exception {
 
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
 
         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        MutableCompositeModification compMod = new MutableCompositeModification();
-        compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
-        Payload payload = new CompositeModificationPayload(compMod.toSerializable());
-        ApplyState applyState = new ApplyState(null, "test",
-                new ReplicatedLogImplEntry(1, 2, payload));
+        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+                newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
 
         shard.underlyingActor().onReceiveCommand(applyState);
 
         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
         assertEquals("Applied state", node, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    @SuppressWarnings("serial")
     @Test
     public void testRecovery() throws Exception {
 
@@ -270,170 +371,56 @@ public class ShardTest extends AbstractActorTest {
         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
-        DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
-        writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-        DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
+        writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-        DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
-        NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
+        NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
 
-        InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
-                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
-                        root).
-                                getNormalizedNode().toByteString().toByteArray(),
-                                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+        InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
+                SerializationUtils.serializeNormalizedNode(root),
+                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
 
         // Set up the InMemoryJournal.
 
-        InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
                   new WriteModification(TestModel.OUTER_LIST_PATH,
-                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
-                          SCHEMA_CONTEXT))));
+                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
-        int nListEntries = 11;
+        int nListEntries = 16;
         Set<Integer> listEntryKeys = new HashSet<>();
+
+        // Add some ModificationPayload entries
         for(int i = 1; i <= nListEntries; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
-                    SCHEMA_CONTEXT);
-            InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newPayload(mod)));
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newModificationPayload(mod)));
         }
 
-        InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
-                new ApplyLogEntries(nListEntries));
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+                new ApplyJournalEntries(nListEntries));
 
-        // Create the actor and wait for recovery complete.
-
-        final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
-        Creator<Shard> creator = new Creator<Shard>() {
-            @Override
-            public Shard create() throws Exception {
-                return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
-                        dataStoreContext, SCHEMA_CONTEXT) {
-                    @Override
-                    protected void onRecoveryComplete() {
-                        try {
-                            super.onRecoveryComplete();
-                        } finally {
-                            recoveryComplete.countDown();
-                        }
-                    }
-                };
-            }
-        };
-
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(creator)), "testRecovery");
-
-        assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
-
-        // Verify data in the data store.
-
-        NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
-        assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
-        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
-                outerList.getValue() instanceof Iterable);
-        for(Object entry: (Iterable<?>) outerList.getValue()) {
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
-                    entry instanceof MapEntryNode);
-            MapEntryNode mapEntry = (MapEntryNode)entry;
-            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
-                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
-            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
-            Object value = idLeaf.get().getValue();
-            assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
-                    listEntryKeys.remove(value));
-        }
-
-        if(!listEntryKeys.isEmpty()) {
-            fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
-                    listEntryKeys);
-        }
-
-        assertEquals("Last log index", nListEntries,
-                shard.underlyingActor().getShardMBean().getLastLogIndex());
-        assertEquals("Commit index", nListEntries,
-                shard.underlyingActor().getShardMBean().getCommitIndex());
-        assertEquals("Last applied", nListEntries,
-                shard.underlyingActor().getShardMBean().getLastApplied());
+        testRecovery(listEntryKeys);
     }
 
-    private CompositeModificationPayload newPayload(Modification... mods) {
+    private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
             compMod.addModification(mod);
         }
 
-        return new CompositeModificationPayload(compMod.toSerializable());
-    }
-
-    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
-            InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
-            MutableCompositeModification modification) {
-        return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
-    }
-
-    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
-            InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
-            MutableCompositeModification modification,
-            final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
-
-        DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
-        tx.write(path, data);
-        final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
-        DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
-
-        doAnswer(new Answer<ListenableFuture<Boolean>>() {
-            @Override
-            public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
-                return realCohort.canCommit();
-            }
-        }).when(cohort).canCommit();
-
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
-                if(preCommit != null) {
-                    return preCommit.apply(realCohort);
-                } else {
-                    return realCohort.preCommit();
-                }
-            }
-        }).when(cohort).preCommit();
-
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
-                return realCohort.commit();
-            }
-        }).when(cohort).commit();
-
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
-                return realCohort.abort();
-            }
-        }).when(cohort).abort();
-
-        modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
-
-        return cohort;
+        return new ModificationPayload(compMod);
     }
 
     @SuppressWarnings({ "unchecked" })
     @Test
     public void testConcurrentThreePhaseCommits() throws Throwable {
-        System.setProperty("shard.persistent", "true");
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testConcurrentThreePhaseCommits");
 
             waitUntilLeader(shard);
 
@@ -468,7 +455,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -482,10 +470,12 @@ public class ShardTest extends AbstractActorTest {
 
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
@@ -511,14 +501,13 @@ public class ShardTest extends AbstractActorTest {
             class OnFutureComplete extends OnComplete<Object> {
                 private final Class<?> expRespType;
 
-                OnFutureComplete(Class<?> expRespType) {
+                OnFutureComplete(final Class<?> expRespType) {
                     this.expRespType = expRespType;
                 }
 
                 @Override
-                public void onComplete(Throwable error, Object resp) {
+                public void onComplete(final Throwable error, final Object resp) {
                     if(error != null) {
-                        System.out.println(new java.util.Date()+": "+getClass().getSimpleName() + " failure: "+error);
                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
                     } else {
                         try {
@@ -530,7 +519,7 @@ public class ShardTest extends AbstractActorTest {
                     }
                 }
 
-                void onSuccess(Object resp) throws Exception {
+                void onSuccess(final Object resp) throws Exception {
                 }
             }
 
@@ -540,7 +529,7 @@ public class ShardTest extends AbstractActorTest {
                 }
 
                 @Override
-                public void onComplete(Throwable error, Object resp) {
+                public void onComplete(final Throwable error, final Object resp) {
                     super.onComplete(error, resp);
                     commitLatch.countDown();
                 }
@@ -549,13 +538,13 @@ public class ShardTest extends AbstractActorTest {
             class OnCanCommitFutureComplete extends OnFutureComplete {
                 private final String transactionID;
 
-                OnCanCommitFutureComplete(String transactionID) {
+                OnCanCommitFutureComplete(final String transactionID) {
                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
                     this.transactionID = transactionID;
                 }
 
                 @Override
-                void onSuccess(Object resp) throws Exception {
+                void onSuccess(final Object resp) throws Exception {
                     CanCommitTransactionReply canCommitReply =
                             CanCommitTransactionReply.fromSerializable(resp);
                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
@@ -606,15 +595,182 @@ public class ShardTest extends AbstractActorTest {
             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
 
-            assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
+            verifyLastLogIndex(shard, 2);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
+    @Test
+    public void testCommitWithPersistenceDisabled() throws Throwable {
+        dataStoreContextBuilder.persistent(false);
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCommitPhaseFailure");
+
+            waitUntilLeader(shard);
+
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            // Setup a simulated transactions with a mock cohort.
+
+            String transactionID = "tx";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+                    TestModel.TEST_PATH, containerNode, modification);
+
+            FiniteDuration duration = duration("5 seconds");
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the CanCommitTransaction message.
+
+            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+            // Send the CanCommitTransaction message.
+
+            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            InOrder inOrder = inOrder(cohort);
+            inOrder.verify(cohort).canCommit();
+            inOrder.verify(cohort).preCommit();
+            inOrder.verify(cohort).commit();
+
+            NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+            assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testCommitWhenTransactionHasNoModifications(){
+        // Note that persistence is enabled which would normally result in the entry getting written to the journal
+        // but here that need not happen
+        new ShardTestKit(getSystem()) {
+            {
+                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        "testCommitWhenTransactionHasNoModifications");
+
+                waitUntilLeader(shard);
+
+                String transactionID = "tx1";
+                MutableCompositeModification modification = new MutableCompositeModification();
+                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+
+                FiniteDuration duration = duration("5 seconds");
+
+                // Simulate the ForwardedReadyTransaction messages that would be sent
+                // by the ShardTransaction.
+
+                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                        cohort, modification, true), getRef());
+                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+                // Send the CanCommitTransaction message.
+
+                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+                InOrder inOrder = inOrder(cohort);
+                inOrder.verify(cohort).canCommit();
+                inOrder.verify(cohort).preCommit();
+                inOrder.verify(cohort).commit();
+
+                // Use MBean for verification
+                // Committed transaction count should increase as usual
+                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+
+                // Commit index should not advance because this does not go into the journal
+                assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
+
+                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            }
+        };
+    }
+
+    @Test
+    public void testCommitWhenTransactionHasModifications(){
+        new ShardTestKit(getSystem()) {
+            {
+                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        "testCommitWhenTransactionHasModifications");
+
+                waitUntilLeader(shard);
+
+                String transactionID = "tx1";
+                MutableCompositeModification modification = new MutableCompositeModification();
+                modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
+                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+
+                FiniteDuration duration = duration("5 seconds");
+
+                // Simulate the ForwardedReadyTransaction messages that would be sent
+                // by the ShardTransaction.
+
+                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                        cohort, modification, true), getRef());
+                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+                // Send the CanCommitTransaction message.
+
+                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+                InOrder inOrder = inOrder(cohort);
+                inOrder.verify(cohort).canCommit();
+                inOrder.verify(cohort).preCommit();
+                inOrder.verify(cohort).commit();
+
+                // Use MBean for verification
+                // Committed transaction count should increase as usual
+                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+
+                // Commit index should advance as we do not have an empty modification
+                assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
+
+                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            }
+        };
+    }
+
     @Test
     public void testCommitPhaseFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCommitPhaseFailure");
 
             waitUntilLeader(shard);
 
@@ -639,10 +795,12 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -669,7 +827,7 @@ public class ShardTest extends AbstractActorTest {
             final CountDownLatch latch = new CountDownLatch(1);
             canCommitFuture.onComplete(new OnComplete<Object>() {
                 @Override
-                public void onComplete(Throwable t, Object resp) {
+                public void onComplete(final Throwable t, final Object resp) {
                     latch.countDown();
                 }
             }, getSystem().dispatcher());
@@ -681,6 +839,8 @@ public class ShardTest extends AbstractActorTest {
             inOrder.verify(cohort1).preCommit();
             inOrder.verify(cohort1).commit();
             inOrder.verify(cohort2).canCommit();
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -688,7 +848,8 @@ public class ShardTest extends AbstractActorTest {
     public void testPreCommitPhaseFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testPreCommitPhaseFailure");
 
             waitUntilLeader(shard);
 
@@ -703,7 +864,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message.
@@ -722,6 +884,8 @@ public class ShardTest extends AbstractActorTest {
             InOrder inOrder = inOrder(cohort);
             inOrder.verify(cohort).canCommit();
             inOrder.verify(cohort).preCommit();
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -729,7 +893,8 @@ public class ShardTest extends AbstractActorTest {
     public void testCanCommitPhaseFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCanCommitPhaseFailure");
 
             waitUntilLeader(shard);
 
@@ -743,46 +908,47 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message.
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
     public void testAbortBeforeFinishCommit() throws Throwable {
-        System.setProperty("shard.persistent", "true");
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testAbortBeforeFinishCommit");
 
             waitUntilLeader(shard);
 
             final FiniteDuration duration = duration("5 seconds");
-            final Timeout timeout = new Timeout(duration);
-
             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 
             final String transactionID = "tx1";
-            final CountDownLatch abortComplete = new CountDownLatch(1);
             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
                 @Override
                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
 
-                    Future<Object> abortFuture = Patterns.ask(shard,
-                            new AbortTransaction(transactionID).toSerializable(), timeout);
-                    abortFuture.onComplete(new OnComplete<Object>() {
-                        @Override
-                        public void onComplete(Throwable e, Object resp) {
-                            abortComplete.countDown();
-                        }
-                    }, getSystem().dispatcher());
+                    // Simulate an AbortTransaction message occurring during replication, after
+                    // persisting and before finishing the commit to the in-memory store.
+                    // We have no followers so due to optimizations in the RaftActor, it does not
+                    // attempt replication and thus we can't send an AbortTransaction message b/c
+                    // it would be processed too late after CommitTransaction completes. So we'll
+                    // simulate an AbortTransaction message occurring during replication by calling
+                    // the shard directly.
+                    //
+                    shard.underlyingActor().doAbortTransaction(transactionID, null);
 
                     return preCommitFuture;
                 }
@@ -793,7 +959,8 @@ public class ShardTest extends AbstractActorTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
@@ -801,25 +968,28 @@ public class ShardTest extends AbstractActorTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            Future<Object> commitFuture = Patterns.ask(shard,
-                    new CommitTransaction(transactionID).toSerializable(), timeout);
-
-            assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
-
-            Await.result(commitFuture, duration);
+            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
+
+            // Since we're simulating an abort occurring during replication and before finish commit,
+            // the data should still get written to the in-memory store since we've gotten past
+            // canCommit and preCommit and persisted the data.
             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
     public void testTransactionCommitTimeout() throws Throwable {
-        dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
+        dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionCommitTimeout");
 
             waitUntilLeader(shard);
 
@@ -854,10 +1024,12 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
@@ -877,16 +1049,19 @@ public class ShardTest extends AbstractActorTest {
 
             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
             assertNotNull(listNodePath + " not found", node);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
-        dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
+        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionCommitQueueCapacityExceeded");
 
             waitUntilLeader(shard);
 
@@ -913,13 +1088,16 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // canCommit 1st Tx.
@@ -935,6 +1113,8 @@ public class ShardTest extends AbstractActorTest {
 
             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -942,10 +1122,13 @@ public class ShardTest extends AbstractActorTest {
     public void testCanCommitBeforeReadyFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCanCommitBeforeReadyFailure");
 
             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
@@ -953,7 +1136,8 @@ public class ShardTest extends AbstractActorTest {
     public void testAbortTransaction() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testAbortTransaction");
 
             waitUntilLeader(shard);
 
@@ -976,10 +1160,12 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1003,34 +1189,88 @@ public class ShardTest extends AbstractActorTest {
 
             // Wait for the 2nd Tx to complete the canCommit phase.
 
-            final CountDownLatch latch = new CountDownLatch(1);
-            canCommitFuture.onComplete(new OnComplete<Object>() {
-                @Override
-                public void onComplete(Throwable t, Object resp) {
-                    latch.countDown();
-                }
-            }, getSystem().dispatcher());
-
-            assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+            Await.ready(canCommitFuture, duration);
 
             InOrder inOrder = inOrder(cohort1, cohort2);
             inOrder.verify(cohort1).canCommit();
             inOrder.verify(cohort2).canCommit();
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
-    public void testCreateSnapshot() throws IOException, InterruptedException {
+    public void testCreateSnapshot() throws Exception {
+        testCreateSnapshot(true, "testCreateSnapshot");
+    }
+
+    @Test
+    public void testCreateSnapshotWithNonPersistentData() throws Exception {
+        testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
+    }
+
+    @SuppressWarnings("serial")
+    public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
+
+        final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
+        class DelegatingPersistentDataProvider implements DataPersistenceProvider {
+            DataPersistenceProvider delegate;
+
+            DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+                this.delegate = delegate;
+            }
+
+            @Override
+            public boolean isRecoveryApplicable() {
+                return delegate.isRecoveryApplicable();
+            }
+
+            @Override
+            public <T> void persist(T o, Procedure<T> procedure) {
+                delegate.persist(o, procedure);
+            }
+
+            @Override
+            public void saveSnapshot(Object o) {
+                savedSnapshot.set(o);
+                delegate.saveSnapshot(o);
+            }
+
+            @Override
+            public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+                delegate.deleteSnapshots(criteria);
+            }
+
+            @Override
+            public void deleteMessages(long sequenceNumber) {
+                delegate.deleteMessages(sequenceNumber);
+            }
+        }
+
+        dataStoreContextBuilder.persistent(persistent);
+
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
             Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
-                            dataStoreContext, SCHEMA_CONTEXT) {
+                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                            newDatastoreContext(), SCHEMA_CONTEXT) {
+
+                        DelegatingPersistentDataProvider delegating;
+
+                        @Override
+                        protected DataPersistenceProvider persistence() {
+                            if(delegating == null) {
+                                delegating = new DelegatingPersistentDataProvider(super.persistence());
+                            }
+
+                            return delegating;
+                        }
+
                         @Override
-                        public void saveSnapshot(Object snapshot) {
-                            super.saveSnapshot(snapshot);
+                        protected void commitSnapshot(final long sequenceNumber) {
+                            super.commitSnapshot(sequenceNumber);
                             latch.get().countDown();
                         }
                     };
@@ -1038,18 +1278,44 @@ public class ShardTest extends AbstractActorTest {
             };
 
             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
+                    Props.create(new DelegatingShardCreator(creator)), shardActorName);
 
             waitUntilLeader(shard);
 
-            shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+            writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
+
+            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
+            shard.tell(capture, getRef());
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
+            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+                    savedSnapshot.get() instanceof Snapshot);
+
+            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
             latch.set(new CountDownLatch(1));
-            shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+            savedSnapshot.set(null);
+
+            shard.tell(capture, getRef());
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+
+            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+                    savedSnapshot.get() instanceof Snapshot);
+
+            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }
+
+        private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
+
+            NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+            assertEquals("Root node", expectedRoot, actual);
+
         }};
     }
 
@@ -1059,8 +1325,7 @@ public class ShardTest extends AbstractActorTest {
      */
     @Test
     public void testInMemoryDataStoreRestore() throws ReadFailedException {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
-            MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
 
         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
@@ -1070,7 +1335,7 @@ public class ShardTest extends AbstractActorTest {
         commitTransaction(putTransaction);
 
 
-        NormalizedNode expected = readStore(store);
+        NormalizedNode<?, ?> expected = readStore(store);
 
         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
 
@@ -1079,84 +1344,123 @@ public class ShardTest extends AbstractActorTest {
 
         commitTransaction(writeTransaction);
 
-        NormalizedNode actual = readStore(store);
+        NormalizedNode<?, ?> actual = readStore(store);
 
         assertEquals(expected, actual);
-
     }
 
-    private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
-        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
-            transaction.read(YangInstanceIdentifier.builder().build());
+    @Test
+    public void testRecoveryApplicable(){
 
-        Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+        final DatastoreContext persistentContext = DatastoreContext.newBuilder().
+                shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
-        NormalizedNode<?, ?> normalizedNode = optional.get();
+        final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+                persistentContext, SCHEMA_CONTEXT);
 
-        transaction.close();
+        final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
+                shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
-        return normalizedNode;
-    }
+        final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+                nonPersistentContext, SCHEMA_CONTEXT);
 
-    private void commitTransaction(DOMStoreWriteTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        ListenableFuture<Void> future =
-            commitCohort.preCommit();
-        try {
-            future.get();
-            future = commitCohort.commit();
-            future.get();
-        } catch (InterruptedException | ExecutionException e) {
-        }
-    }
+        new ShardTestKit(getSystem()) {{
+            TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
+                    persistentProps, "testPersistence1");
 
-    private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
-        return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
-            @Override
-            public void onDataChanged(
-                AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+            assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
 
-            }
-        };
+            shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
+                    nonPersistentProps, "testPersistence2");
+
+            assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+        }};
     }
 
-    private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
-            throws ExecutionException, InterruptedException {
-        DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
+    @Test
+    public void testOnDatastoreContext() {
+        new ShardTestKit(getSystem()) {{
+            dataStoreContextBuilder.persistent(true);
+
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
+
+            assertEquals("isRecoveryApplicable", true,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            waitUntilLeader(shard);
 
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-            transaction.read(id);
+            shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
-        Optional<NormalizedNode<?, ?>> optional = future.get();
-        NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
+            assertEquals("isRecoveryApplicable", false,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
 
-        transaction.close();
+            shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
-        return node;
+            assertEquals("isRecoveryApplicable", true,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
     }
 
-    private void writeToStore(TestActorRef<Shard> shard, YangInstanceIdentifier id, NormalizedNode<?,?> node)
-        throws ExecutionException, InterruptedException {
-        DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
+    @Test
+    public void testRegisterRoleChangeListener() throws Exception {
+        new ShardTestKit(getSystem()) {
+            {
+                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        "testRegisterRoleChangeListener");
 
-        transaction.write(id, node);
+                waitUntilLeader(shard);
 
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
+                TestActorRef<MessageCollectorActor> listener =
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+                shard.tell(new RegisterRoleChangeListener(), listener);
+
+                // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
+                // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
+                // sleep.
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+                List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
+
+                assertEquals(1, allMatching.size());
+            }
+        };
     }
 
-    private static final class DelegatingShardCreator implements Creator<Shard> {
-        private final Creator<Shard> delegate;
+    @Test
+    public void testFollowerInitialSyncStatus() throws Exception {
+        final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                "testFollowerInitialSyncStatus");
 
-        DelegatingShardCreator(Creator<Shard> delegate) {
-            this.delegate = delegate;
-        }
+        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
+
+        assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
+
+        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
+
+        assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
 
-        @Override
-        public Shard create() throws Exception {
-            return delegate.create();
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    private void commitTransaction(final DOMStoreWriteTransaction transaction) {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        ListenableFuture<Void> future =
+            commitCohort.preCommit();
+        try {
+            future.get();
+            future = commitCohort.commit();
+            future.get();
+        } catch (InterruptedException | ExecutionException e) {
         }
     }
 }