Merge "BUG 3045 : Use non-strict parsing in hello message."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index efcb0c9bfd6da4ae3cf0dce5681d3931eaa1271e..3d28672c9fd08906e2fba0a063dbcf10e4326ced 100644 (file)
@@ -8,6 +8,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -16,16 +17,14 @@ import akka.actor.Props;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
-import akka.japi.Procedure;
 import akka.pattern.Patterns;
-import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
@@ -34,13 +33,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -87,25 +86,32 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 public class ShardTest extends AbstractShardTest {
+    private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
 
     @Test
     public void testRegisterChangeListener() throws Exception {
@@ -160,8 +166,12 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
+                    // Use a non persistent provider because this test actually invokes persist on the journal
+                    // this will cause all other messages to not be queued properly after that.
+                    // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
+                    // it does do a persist)
                     return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
+                            dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -333,8 +343,8 @@ public class ShardTest extends AbstractShardTest {
         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
                 "testApplySnapshot");
 
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        store.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -373,12 +383,27 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testRecovery() throws Exception {
+    public void testApplyStateWithCandidatePayload() throws Exception {
 
-        // Set up the InMemorySnapshotStore.
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+
+        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
+
+        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+                DataTreeCandidatePayload.create(candidate)));
+
+        shard.underlyingActor().onReceiveCommand(applyState);
+
+        NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+        assertEquals("Applied state", node, actual);
 
-        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
-        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
+        DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+        testStore.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -387,6 +412,55 @@ public class ShardTest extends AbstractShardTest {
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
                 SerializationUtils.serializeNormalizedNode(root),
                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+        return testStore;
+    }
+
+    private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
+        source.validate(mod);
+        final DataTreeCandidate candidate = source.prepare(mod);
+        source.commit(candidate);
+        return DataTreeCandidatePayload.create(candidate);
+    }
+
+    @Test
+    public void testDataTreeCandidateRecovery() throws Exception {
+        // Set up the InMemorySnapshotStore.
+        final DataTree source = setupInMemorySnapshotStore();
+
+        final DataTreeModification writeMod = source.takeSnapshot().newModification();
+        writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+        // Set up the InMemoryJournal.
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
+
+        int nListEntries = 16;
+        Set<Integer> listEntryKeys = new HashSet<>();
+
+        // Add some ModificationPayload entries
+        for (int i = 1; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+
+            final DataTreeModification mod = source.takeSnapshot().newModification();
+            mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                payloadForModification(source, mod)));
+        }
+
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+                new ApplyJournalEntries(nListEntries));
+
+        testRecovery(listEntryKeys);
+    }
+
+    @Test
+    public void testModicationRecovery() throws Exception {
+
+        // Set up the InMemorySnapshotStore.
+        setupInMemorySnapshotStore();
 
         // Set up the InMemoryJournal.
 
@@ -414,7 +488,7 @@ public class ShardTest extends AbstractShardTest {
         testRecovery(listEntryKeys);
     }
 
-    private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
+    private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
             compMod.addModification(mod);
@@ -423,7 +497,6 @@ public class ShardTest extends AbstractShardTest {
         return new ModificationPayload(compMod);
     }
 
-    @SuppressWarnings({ "unchecked" })
     @Test
     public void testConcurrentThreePhaseCommits() throws Throwable {
         new ShardTestKit(getSystem()) {{
@@ -433,42 +506,42 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            final String transactionID1 = "tx1";
-            final String transactionID2 = "tx2";
-            final String transactionID3 = "tx3";
+         // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
-                    if(transactionID.equals(transactionID1)) {
-                        mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
-                        return mockCohort1.get();
-                    } else if(transactionID.equals(transactionID2)) {
-                        mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
-                        return mockCohort2.get();
-                    } else {
-                        mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
-                        return mockCohort3.get();
-                    }
-                }
-            };
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification3);
 
             long timeoutSec = 5;
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            // Send a BatchedModifications message for the first transaction.
+            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
-            assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
-            assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
+                    expectMsgClass(duration, ReadyTransactionReply.class));
+            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -477,16 +550,15 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send BatchedModifications for the next 2 Tx's.
+            // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
             // processed after the first Tx completes.
@@ -579,31 +651,20 @@ public class ShardTest extends AbstractShardTest {
 
             assertEquals("Commits complete", true, done);
 
-            InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
-            inOrder.verify(mockCohort1.get()).canCommit();
-            inOrder.verify(mockCohort1.get()).preCommit();
-            inOrder.verify(mockCohort1.get()).commit();
-            inOrder.verify(mockCohort2.get()).canCommit();
-            inOrder.verify(mockCohort2.get()).preCommit();
-            inOrder.verify(mockCohort2.get()).commit();
-            inOrder.verify(mockCohort3.get()).canCommit();
-            inOrder.verify(mockCohort3.get()).preCommit();
-            inOrder.verify(mockCohort3.get()).commit();
+            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort1).commit();
+            inOrder.verify(cohort2).canCommit();
+            inOrder.verify(cohort2).preCommit();
+            inOrder.verify(cohort2).commit();
+            inOrder.verify(cohort3).canCommit();
+            inOrder.verify(cohort3).preCommit();
+            inOrder.verify(cohort3).commit();
 
             // Verify data in the data store.
 
-            NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
-            assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
-                    outerList.getValue() instanceof Iterable);
-            Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
-                       entry instanceof MapEntryNode);
-            MapEntryNode mapEntry = (MapEntryNode)entry;
-            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
-                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
-            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
-            assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+            verifyOuterListEntry(shard, 1);
 
             verifyLastApplied(shard, 2);
 
@@ -611,36 +672,36 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
-    private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data, boolean ready) {
-        return newBatchedModifications(transactionID, null, path, data, ready);
+    private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
+            NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
+        return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
     }
 
-    private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
-            YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
+    private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
+            YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
         batched.addModification(new WriteModification(path, data));
         batched.setReady(ready);
+        batched.setDoCommitOnReady(doCommitOnReady);
         return batched;
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testMultipleBatchedModifications() throws Throwable {
+    public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testMultipleBatchedModifications");
+                    "testBatchedModificationsWithNoCommitOnReady");
 
             waitUntilLeader(shard);
 
             final String transactionID = "tx";
             FiniteDuration duration = duration("5 seconds");
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+            final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
                 @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+                public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
                     if(mockCohort.get() == null) {
                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
                     }
@@ -654,19 +715,19 @@ public class ShardTest extends AbstractShardTest {
             // Send a BatchedModifications to start a transaction.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -687,23 +748,85 @@ public class ShardTest extends AbstractShardTest {
 
             // Verify data in the data store.
 
-            NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
-            assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
-                    outerList.getValue() instanceof Iterable);
-            Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
-                       entry instanceof MapEntryNode);
-            MapEntryNode mapEntry = (MapEntryNode)entry;
-            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
-                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
-            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
-            assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+            verifyOuterListEntry(shard, 1);
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
+    @Test
+    public void testBatchedModificationsWithCommitOnReady() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testBatchedModificationsWithCommitOnReady");
+
+            waitUntilLeader(shard);
+
+            final String transactionID = "tx";
+            FiniteDuration duration = duration("5 seconds");
+
+            final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
+            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+                @Override
+                public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
+                    if(mockCohort.get() == null) {
+                        mockCohort.set(createDelegatingMockCohort("cohort", actual));
+                    }
+
+                    return mockCohort.get();
+                }
+            };
+
+            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+            // Send a BatchedModifications to start a transaction.
+
+            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
+
+            // Send a couple more BatchedModifications.
+
+            shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
+
+            shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
+                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            InOrder inOrder = inOrder(mockCohort.get());
+            inOrder.verify(mockCohort.get()).canCommit();
+            inOrder.verify(mockCohort.get()).preCommit();
+            inOrder.verify(mockCohort.get()).commit();
+
+            // Verify data in the data store.
+
+            verifyOuterListEntry(shard, 1);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @SuppressWarnings("unchecked")
+    private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
+        NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
+        assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+                outerList.getValue() instanceof Iterable);
+        Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
+        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+                entry instanceof MapEntryNode);
+        MapEntryNode mapEntry = (MapEntryNode)entry;
+        Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+                mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+        assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+        assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
+    }
+
     @Test
     public void testBatchedModificationsOnTransactionChain() throws Throwable {
         new ShardTestKit(getSystem()) {{
@@ -724,8 +847,8 @@ public class ShardTest extends AbstractShardTest {
             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
-                    containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+                    containerNode, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Create a read Tx on the same chain.
 
@@ -762,6 +885,8 @@ public class ShardTest extends AbstractShardTest {
         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
         new ShardTestKit(getSystem()) {{
             Creator<Shard> creator = new Creator<Shard>() {
+                private static final long serialVersionUID = 1L;
+
                 @Override
                 public Shard create() throws Exception {
                     return new Shard(shardID, Collections.<String,String>emptyMap(),
@@ -797,6 +922,45 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testForwardedReadyTransactionWithImmediateCommit");
+
+            waitUntilLeader(shard);
+
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID = "tx1";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+                    TestModel.TEST_PATH, containerNode, modification);
+
+            FiniteDuration duration = duration("5 seconds");
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true, true), getRef());
+
+            expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+            InOrder inOrder = inOrder(cohort);
+            inOrder.verify(cohort).canCommit();
+            inOrder.verify(cohort).preCommit();
+            inOrder.verify(cohort).commit();
+
+            NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+            assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @Test
     public void testCommitWithPersistenceDisabled() throws Throwable {
         dataStoreContextBuilder.persistent(false);
@@ -807,14 +971,24 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+            // Setup a simulated transactions with a mock cohort.
+
             String transactionID = "tx";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+                    TestModel.TEST_PATH, containerNode, modification);
+
             FiniteDuration duration = duration("5 seconds");
 
-            // Send a BatchedModifications to start a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -828,6 +1002,11 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
+            InOrder inOrder = inOrder(cohort);
+            inOrder.verify(cohort).canCommit();
+            inOrder.verify(cohort).preCommit();
+            inOrder.verify(cohort).commit();
+
             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
 
@@ -835,6 +1014,25 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    private static DataTreeCandidateTip mockCandidate(final String name) {
+        DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
+        DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
+        doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
+        doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
+        doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
+        doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
+        return mockCandidate;
+    }
+
+    private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
+        DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
+        DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
+        doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
+        doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
+        doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
+        return mockCandidate;
+    }
+
     @Test
     public void testCommitWhenTransactionHasNoModifications(){
         // Note that persistence is enabled which would normally result in the entry getting written to the journal
@@ -849,10 +1047,11 @@ public class ShardTest extends AbstractShardTest {
 
                 String transactionID = "tx1";
                 MutableCompositeModification modification = new MutableCompositeModification();
-                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+                doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
 
                 FiniteDuration duration = duration("5 seconds");
 
@@ -860,8 +1059,8 @@ public class ShardTest extends AbstractShardTest {
                 // by the ShardTransaction.
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                        cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                        cohort, modification, true, false), getRef());
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
@@ -904,10 +1103,11 @@ public class ShardTest extends AbstractShardTest {
                 String transactionID = "tx1";
                 MutableCompositeModification modification = new MutableCompositeModification();
                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
-                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+                doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
 
                 FiniteDuration duration = duration("5 seconds");
 
@@ -915,8 +1115,8 @@ public class ShardTest extends AbstractShardTest {
                 // by the ShardTransaction.
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                        cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                        cohort, modification, true, false), getRef());
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
@@ -935,7 +1135,7 @@ public class ShardTest extends AbstractShardTest {
 
                 // Use MBean for verification
                 // Committed transaction count should increase as usual
-                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+                assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
 
                 // Commit index should advance as we do not have an empty modification
                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
@@ -955,40 +1155,35 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            // Setup 2 mock cohorts. The first one fails in the commit phase.
+            // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+            // commit phase.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+            doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Send BatchedModifications to start and ready each transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -1040,45 +1235,66 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
-
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
+            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
+            final Timeout timeout = new Timeout(duration);
 
-            // Send BatchedModifications to start and ready a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            // Send the CanCommitTransaction message.
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            // Send the CanCommitTransaction message for the first Tx.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send the CommitTransaction message. This should send back an error
-            // for preCommit failure.
+            // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
+            // processed after the first Tx completes.
+
+            Future<Object> canCommitFuture = Patterns.ask(shard,
+                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+            // Send the CommitTransaction message for the first Tx. This should send back an error
+            // and trigger the 2nd Tx to proceed.
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
-            InOrder inOrder = inOrder(cohort);
-            inOrder.verify(cohort).canCommit();
-            inOrder.verify(cohort).preCommit();
+            // Wait for the 2nd Tx to complete the canCommit phase.
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            canCommitFuture.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(final Throwable t, final Object resp) {
+                    latch.countDown();
+                }
+            }, getSystem().dispatcher());
+
+            assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+
+            InOrder inOrder = inOrder(cohort1, cohort2);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort2).canCommit();
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -1095,31 +1311,183 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // Send the CanCommitTransaction message.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
 
-            // Send BatchedModifications to start and ready a transaction.
+            // Send another can commit to ensure the failed one got cleaned up.
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            reset(cohort);
+
+            String transactionID2 = "tx2";
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("getCanCommit", true, reply.getCanCommit());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testCanCommitPhaseFalseResponse() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCanCommitPhaseFalseResponse");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+            doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("getCanCommit", false, reply.getCanCommit());
+
+            // Send another can commit to ensure the failed one got cleaned up.
+
+            reset(cohort);
+
+            String transactionID2 = "tx2";
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            reply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("getCanCommit", true, reply.getCanCommit());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testImmediateCommitWithCanCommitPhaseFailure");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort, modification, true, true), getRef());
+
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+            // Send another can commit to ensure the failed one got cleaned up.
+
+            reset(cohort);
+
+            String transactionID2 = "tx2";
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+            doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+            doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+            DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+            DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+            doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+            doReturn(candidateRoot).when(candidate).getRootNode();
+            doReturn(candidate).when(cohort).getCandidate();
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort, modification, true, true), getRef());
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testImmediateCommitWithCanCommitPhaseFalseResponse");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            String transactionID = "tx1";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+            doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true, true), getRef());
+
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
+            // Send another can commit to ensure the failed one got cleaned up.
+
+            reset(cohort);
+
+            String transactionID2 = "tx2";
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+            doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+            doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+            DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+            DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+            doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+            doReturn(candidateRoot).when(candidate).getRootNode();
+            doReturn(candidate).when(cohort).getCandidate();
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort, modification, true, true), getRef());
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
@@ -1134,13 +1502,13 @@ public class ShardTest extends AbstractShardTest {
             waitUntilLeader(shard);
 
             final FiniteDuration duration = duration("5 seconds");
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             final String transactionID = "tx1";
-            Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
-                          new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
+            Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
+                          new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
                 @Override
-                public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
+                public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
 
                     // Simulate an AbortTransaction message occurring during replication, after
@@ -1157,9 +1525,14 @@ public class ShardTest extends AbstractShardTest {
                 }
             };
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification = new MutableCompositeModification();
+            ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                    modification, preCommit);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
@@ -1193,26 +1566,42 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-            // Create and ready the 1st Tx - will timeout
+            // Create 1st Tx - will timeout
 
             String transactionID1 = "tx1";
-            shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification1);
 
-            // Create and ready the 2nd Tx
+            // Create 2nd Tx
 
-            String transactionID2 = "tx2";
+            String transactionID2 = "tx3";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
-            shard.tell(newBatchedModifications(transactionID2, listNodePath,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+                    listNodePath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
+                    modification2);
+
+            // Ready the Tx's
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
@@ -1224,6 +1613,11 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
 
+            // Try to commit the 1st Tx - should fail as it's not the current Tx.
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
+
             // Commit the 2nd Tx.
 
             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1249,23 +1643,38 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
             String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
             String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
             String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
 
-            // Send a BatchedModifications to start transactions and ready them.
+            // Ready the Tx's
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx.
 
@@ -1310,37 +1719,30 @@ public class ShardTest extends AbstractShardTest {
 
             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
-            // Send BatchedModifications to start and ready each transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -1386,68 +1788,39 @@ public class ShardTest extends AbstractShardTest {
     @SuppressWarnings("serial")
     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
 
-        final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
-        class DelegatingPersistentDataProvider implements DataPersistenceProvider {
-            DataPersistenceProvider delegate;
-
-            DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
-                this.delegate = delegate;
-            }
-
-            @Override
-            public boolean isRecoveryApplicable() {
-                return delegate.isRecoveryApplicable();
-            }
+        final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
 
-            @Override
-            public <T> void persist(T o, Procedure<T> procedure) {
-                delegate.persist(o, procedure);
+        final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
+        class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
+            TestPersistentDataProvider(DataPersistenceProvider delegate) {
+                super(delegate);
             }
 
             @Override
             public void saveSnapshot(Object o) {
                 savedSnapshot.set(o);
-                delegate.saveSnapshot(o);
-            }
-
-            @Override
-            public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-                delegate.deleteSnapshots(criteria);
-            }
-
-            @Override
-            public void deleteMessages(long sequenceNumber) {
-                delegate.deleteMessages(sequenceNumber);
+                super.saveSnapshot(o);
             }
         }
 
         dataStoreContextBuilder.persistent(persistent);
 
-
-
         new ShardTestKit(getSystem()) {{
-            final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-
             class TestShard extends Shard {
 
                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
                     super(name, peerAddresses, datastoreContext, schemaContext);
+                    setPersistence(new TestPersistentDataProvider(super.persistence()));
                 }
 
-                DelegatingPersistentDataProvider delegating;
+                @Override
+                public void handleCommand(Object message) {
+                    super.handleCommand(message);
 
-                protected DataPersistenceProvider persistence() {
-                    if(delegating == null) {
-                        delegating = new DelegatingPersistentDataProvider(super.persistence());
+                    if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+                        latch.get().countDown();
                     }
-                    return delegating;
-                }
-
-                @Override
-                protected void commitSnapshot(final long sequenceNumber) {
-                    super.commitSnapshot(sequenceNumber);
-                    latch.get().countDown();
                 }
 
                 @Override
@@ -1510,29 +1883,29 @@ public class ShardTest extends AbstractShardTest {
     /**
      * This test simply verifies that the applySnapShot logic will work
      * @throws ReadFailedException
+     * @throws DataValidationFailedException
      */
     @Test
-    public void testInMemoryDataStoreRestore() throws ReadFailedException {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
+    public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
+        DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        store.setSchemaContext(SCHEMA_CONTEXT);
 
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
-
-        DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
+        DataTreeModification putTransaction = store.takeSnapshot().newModification();
         putTransaction.write(TestModel.TEST_PATH,
             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-        commitTransaction(putTransaction);
+        commitTransaction(store, putTransaction);
 
 
-        NormalizedNode<?, ?> expected = readStore(store);
+        NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
 
-        DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
+        DataTreeModification writeTransaction = store.takeSnapshot().newModification();
 
         writeTransaction.delete(YangInstanceIdentifier.builder().build());
         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
 
-        commitTransaction(writeTransaction);
+        commitTransaction(store, writeTransaction);
 
-        NormalizedNode<?, ?> actual = readStore(store);
+        NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
 
         assertEquals(expected, actual);
     }
@@ -1556,14 +1929,14 @@ public class ShardTest extends AbstractShardTest {
             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
                     persistentProps, "testPersistence1");
 
-            assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+            assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
 
             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
                     nonPersistentProps, "testPersistence2");
 
-            assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+            assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
 
             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
@@ -1579,19 +1952,19 @@ public class ShardTest extends AbstractShardTest {
             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             waitUntilLeader(shard);
 
             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", false,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -1641,15 +2014,9 @@ public class ShardTest extends AbstractShardTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    private void commitTransaction(final DOMStoreWriteTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        ListenableFuture<Void> future =
-            commitCohort.preCommit();
-        try {
-            future.get();
-            future = commitCohort.commit();
-            future.get();
-        } catch (InterruptedException | ExecutionException e) {
-        }
+    private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
+        modification.ready();
+        store.validate(modification);
+        store.commit(store.prepare(modification));
     }
 }