Merge "BUG 2854 : Do not add empty read write transactions to the replicable journal"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index bbbc4db5e351f99df3856e807b146f18ac1899d1..a87000136fc52729d4e760dc2ffa86af84346392 100644 (file)
@@ -5,8 +5,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -24,7 +22,6 @@ import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -32,21 +29,16 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -61,40 +53,38 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -103,46 +93,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-public class ShardTest extends AbstractActorTest {
-
-    private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
-
-    private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
-
-    private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
-            .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
-
-    private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
-            shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
-            shardHeartbeatIntervalInMillis(100);
-
-    @Before
-    public void setUp() {
-        Builder newBuilder = DatastoreContext.newBuilder();
-        InMemorySnapshotStore.clear();
-        InMemoryJournal.clear();
-    }
-
-    @After
-    public void tearDown() {
-        InMemorySnapshotStore.clear();
-        InMemoryJournal.clear();
-    }
-
-    private DatastoreContext newDatastoreContext() {
-        return dataStoreContextBuilder.build();
-    }
-
-    private Props newShardProps() {
-        return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
-                newDatastoreContext(), SCHEMA_CONTEXT);
-    }
-
+public class ShardTest extends AbstractShardTest {
     @Test
     public void testRegisterChangeListener() throws Exception {
         new ShardTestKit(getSystem()) {{
@@ -390,36 +345,6 @@ public class ShardTest extends AbstractActorTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    @Test
-    public void testApplyHelium2VersionSnapshot() throws Exception {
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
-                "testApplySnapshot");
-
-        NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
-
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
-
-        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-        YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
-        NormalizedNode<?,?> expected = readStore(store, root);
-
-        NormalizedNodeMessages.Container encode = codec.encode(expected);
-
-        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
-                encode.getNormalizedNode().toByteString().toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
-
-        shard.underlyingActor().onReceiveCommand(applySnapshot);
-
-        NormalizedNode<?,?> actual = readStore(shard, root);
-
-        assertEquals("Root node", expected, actual);
-
-        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-    }
-
     @Test
     public void testApplyState() throws Exception {
 
@@ -438,24 +363,6 @@ public class ShardTest extends AbstractActorTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    @Test
-    public void testApplyStateLegacy() throws Exception {
-
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
-
-        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
-                newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
-
-        shard.underlyingActor().onReceiveCommand(applyState);
-
-        NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
-        assertEquals("Applied state", node, actual);
-
-        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-    }
-
     @Test
     public void testRecovery() throws Exception {
 
@@ -474,7 +381,7 @@ public class ShardTest extends AbstractActorTest {
 
         // Set up the InMemoryJournal.
 
-        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
                   new WriteModification(TestModel.OUTER_LIST_PATH,
                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
@@ -498,141 +405,6 @@ public class ShardTest extends AbstractActorTest {
         testRecovery(listEntryKeys);
     }
 
-    @Test
-    public void testHelium2VersionRecovery() throws Exception {
-
-        // Set up the InMemorySnapshotStore.
-
-        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
-        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
-
-        writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-        NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
-
-        InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
-                                getNormalizedNode().toByteString().toByteArray(),
-                                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
-
-        // Set up the InMemoryJournal.
-
-        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
-                  new WriteModification(TestModel.OUTER_LIST_PATH,
-                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
-
-        int nListEntries = 16;
-        Set<Integer> listEntryKeys = new HashSet<>();
-        int i = 1;
-
-        // Add some CompositeModificationPayload entries
-        for(; i <= 8; i++) {
-            listEntryKeys.add(Integer.valueOf(i));
-            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
-            Modification mod = new MergeModification(path,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
-            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newLegacyPayload(mod)));
-        }
-
-        // Add some CompositeModificationByteStringPayload entries
-        for(; i <= nListEntries; i++) {
-            listEntryKeys.add(Integer.valueOf(i));
-            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
-            Modification mod = new MergeModification(path,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
-            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newLegacyByteStringPayload(mod)));
-        }
-
-        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
-
-        testRecovery(listEntryKeys);
-    }
-
-    private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
-        // Create the actor and wait for recovery complete.
-
-        int nListEntries = listEntryKeys.size();
-
-        final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
-        @SuppressWarnings("serial")
-        Creator<Shard> creator = new Creator<Shard>() {
-            @Override
-            public Shard create() throws Exception {
-                return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
-                        newDatastoreContext(), SCHEMA_CONTEXT) {
-                    @Override
-                    protected void onRecoveryComplete() {
-                        try {
-                            super.onRecoveryComplete();
-                        } finally {
-                            recoveryComplete.countDown();
-                        }
-                    }
-                };
-            }
-        };
-
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(creator)), "testRecovery");
-
-        assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
-
-        // Verify data in the data store.
-
-        NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
-        assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
-        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
-                outerList.getValue() instanceof Iterable);
-        for(Object entry: (Iterable<?>) outerList.getValue()) {
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
-                    entry instanceof MapEntryNode);
-            MapEntryNode mapEntry = (MapEntryNode)entry;
-            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
-                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
-            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
-            Object value = idLeaf.get().getValue();
-            assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
-                    listEntryKeys.remove(value));
-        }
-
-        if(!listEntryKeys.isEmpty()) {
-            fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
-                    listEntryKeys);
-        }
-
-        assertEquals("Last log index", nListEntries,
-                shard.underlyingActor().getShardMBean().getLastLogIndex());
-        assertEquals("Commit index", nListEntries,
-                shard.underlyingActor().getShardMBean().getCommitIndex());
-        assertEquals("Last applied", nListEntries,
-                shard.underlyingActor().getShardMBean().getLastApplied());
-
-        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-    }
-
-    private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
-        MutableCompositeModification compMod = new MutableCompositeModification();
-        for(Modification mod: mods) {
-            compMod.addModification(mod);
-        }
-
-        return new CompositeModificationPayload(compMod.toSerializable());
-    }
-
-    private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
-        MutableCompositeModification compMod = new MutableCompositeModification();
-        for(Modification mod: mods) {
-            compMod.addModification(mod);
-        }
-
-        return new CompositeModificationByteStringPayload(compMod.toSerializable());
-    }
-
     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
@@ -642,59 +414,6 @@ public class ShardTest extends AbstractActorTest {
         return new ModificationPayload(compMod);
     }
 
-    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
-            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
-            final MutableCompositeModification modification) {
-        return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
-    }
-
-    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
-            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
-            final MutableCompositeModification modification,
-            final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
-
-        DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
-        tx.write(path, data);
-        final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
-        DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
-
-        doAnswer(new Answer<ListenableFuture<Boolean>>() {
-            @Override
-            public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
-                return realCohort.canCommit();
-            }
-        }).when(cohort).canCommit();
-
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
-                if(preCommit != null) {
-                    return preCommit.apply(realCohort);
-                } else {
-                    return realCohort.preCommit();
-                }
-            }
-        }).when(cohort).preCommit();
-
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
-                return realCohort.commit();
-            }
-        }).when(cohort).commit();
-
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
-                return realCohort.abort();
-            }
-        }).when(cohort).abort();
-
-        modification.addModification(new WriteModification(path, data));
-
-        return cohort;
-    }
-
     @SuppressWarnings({ "unchecked" })
     @Test
     public void testConcurrentThreePhaseCommits() throws Throwable {
@@ -882,18 +601,6 @@ public class ShardTest extends AbstractActorTest {
         }};
     }
 
-    private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
-        for(int i = 0; i < 20 * 5; i++) {
-            long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
-            if(lastLogIndex == expectedValue) {
-                break;
-            }
-            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-        }
-
-        assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
-    }
-
     @Test
     public void testCommitWithPersistenceDisabled() throws Throwable {
         dataStoreContextBuilder.persistent(false);
@@ -947,6 +654,117 @@ public class ShardTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testCommitWhenTransactionHasNoModifications(){
+        // Note that persistence is enabled which would normally result in the entry getting written to the journal
+        // but here that need not happen
+        new ShardTestKit(getSystem()) {
+            {
+                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        "testCommitWhenTransactionHasNoModifications");
+
+                waitUntilLeader(shard);
+
+                String transactionID = "tx1";
+                MutableCompositeModification modification = new MutableCompositeModification();
+                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+
+                FiniteDuration duration = duration("5 seconds");
+
+                // Simulate the ForwardedReadyTransaction messages that would be sent
+                // by the ShardTransaction.
+
+                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                        cohort, modification, true), getRef());
+                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+                // Send the CanCommitTransaction message.
+
+                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+                InOrder inOrder = inOrder(cohort);
+                inOrder.verify(cohort).canCommit();
+                inOrder.verify(cohort).preCommit();
+                inOrder.verify(cohort).commit();
+
+                // Use MBean for verification
+                // Committed transaction count should increase as usual
+                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+
+                // Commit index should not advance because this does not go into the journal
+                assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
+
+                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            }
+        };
+    }
+
+    @Test
+    public void testCommitWhenTransactionHasModifications(){
+        new ShardTestKit(getSystem()) {
+            {
+                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        "testCommitWhenTransactionHasModifications");
+
+                waitUntilLeader(shard);
+
+                String transactionID = "tx1";
+                MutableCompositeModification modification = new MutableCompositeModification();
+                modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
+                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+
+                FiniteDuration duration = duration("5 seconds");
+
+                // Simulate the ForwardedReadyTransaction messages that would be sent
+                // by the ShardTransaction.
+
+                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                        cohort, modification, true), getRef());
+                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+                // Send the CanCommitTransaction message.
+
+                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+                InOrder inOrder = inOrder(cohort);
+                inOrder.verify(cohort).canCommit();
+                inOrder.verify(cohort).preCommit();
+                inOrder.verify(cohort).commit();
+
+                // Use MBean for verification
+                // Committed transaction count should increase as usual
+                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+
+                // Commit index should advance as we do not have an empty modification
+                assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
+
+                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            }
+        };
+    }
+
     @Test
     public void testCommitPhaseFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
@@ -1562,93 +1380,87 @@ public class ShardTest extends AbstractActorTest {
             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
         }};
-
     }
 
+    @Test
+    public void testOnDatastoreContext() {
+        new ShardTestKit(getSystem()) {{
+            dataStoreContextBuilder.persistent(true);
 
-    private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
-        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
-            transaction.read(YangInstanceIdentifier.builder().build());
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
 
-        Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+            assertEquals("isRecoveryApplicable", true,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
 
-        NormalizedNode<?, ?> normalizedNode = optional.get();
+            waitUntilLeader(shard);
 
-        transaction.close();
+            shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
-        return normalizedNode;
-    }
+            assertEquals("isRecoveryApplicable", false,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
 
-    private void commitTransaction(final DOMStoreWriteTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        ListenableFuture<Void> future =
-            commitCohort.preCommit();
-        try {
-            future.get();
-            future = commitCohort.commit();
-            future.get();
-        } catch (InterruptedException | ExecutionException e) {
-        }
-    }
+            shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
-    private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
-        return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
-            @Override
-            public void onDataChanged(
-                final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+            assertEquals("isRecoveryApplicable", true,
+                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
 
-            }
-        };
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
     }
 
-    static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
-            throws ExecutionException, InterruptedException {
-        return readStore(shard.underlyingActor().getDataStore(), id);
-    }
+    @Test
+    public void testRegisterRoleChangeListener() throws Exception {
+        new ShardTestKit(getSystem()) {
+            {
+                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        "testRegisterRoleChangeListener");
 
-    public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
-            throws ExecutionException, InterruptedException {
-        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
+                waitUntilLeader(shard);
 
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-            transaction.read(id);
+                TestActorRef<MessageCollectorActor> listener =
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
 
-        Optional<NormalizedNode<?, ?>> optional = future.get();
-        NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
+                shard.tell(new RegisterRoleChangeListener(), listener);
 
-        transaction.close();
+                // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
+                // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
+                // sleep.
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
-        return node;
-    }
+                List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
 
-    static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
-        writeToStore(shard.underlyingActor().getDataStore(), id, node);
+                assertEquals(1, allMatching.size());
+            }
+        };
     }
 
-    public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
-        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+    @Test
+    public void testFollowerInitialSyncStatus() throws Exception {
+        final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                "testFollowerInitialSyncStatus");
 
-        transaction.write(id, node);
+        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
 
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
-    }
+        assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
 
-    @SuppressWarnings("serial")
-    private static final class DelegatingShardCreator implements Creator<Shard> {
-        private final Creator<Shard> delegate;
+        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
 
-        DelegatingShardCreator(final Creator<Shard> delegate) {
-            this.delegate = delegate;
-        }
+        assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
 
-        @Override
-        public Shard create() throws Exception {
-            return delegate.create();
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    private void commitTransaction(final DOMStoreWriteTransaction transaction) {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        ListenableFuture<Void> future =
+            commitCohort.preCommit();
+        try {
+            future.get();
+            future = commitCohort.commit();
+            future.get();
+        } catch (InterruptedException | ExecutionException e) {
         }
     }
 }