Merge changes I114cbac1,I45c2e7cd
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index adc7f4706c1e871f6ddab28bf8364e06a975d0fc..e3b82df1743e75c433cec193d54a2cbfbd696319 100644 (file)
@@ -16,9 +16,8 @@ import akka.actor.Props;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
-import akka.japi.Procedure;
 import akka.pattern.Patterns;
-import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -41,6 +40,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -69,13 +70,13 @@ import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListene
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -99,6 +100,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -120,7 +122,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListener-DataChangeListener");
 
             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                    dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+                    dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
 
             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterChangeListenerReply.class);
@@ -158,8 +160,12 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
+                    // Use a non persistent provider because this test actually invokes persist on the journal
+                    // this will cause all other messages to not be queued properly after that.
+                    // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
+                    // it does do a persist)
                     return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
+                            dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -208,7 +214,7 @@ public class ShardTest extends AbstractShardTest {
                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
             // Now send the RegisterChangeListener and wait for the reply.
-            shard.tell(new RegisterChangeListener(path, dclActor.path(),
+            shard.tell(new RegisterChangeListener(path, dclActor,
                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
 
             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
@@ -431,42 +437,42 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            final String transactionID1 = "tx1";
-            final String transactionID2 = "tx2";
-            final String transactionID3 = "tx3";
+         // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
-                    if(transactionID.equals(transactionID1)) {
-                        mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
-                        return mockCohort1.get();
-                    } else if(transactionID.equals(transactionID2)) {
-                        mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
-                        return mockCohort2.get();
-                    } else {
-                        mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
-                        return mockCohort3.get();
-                    }
-                }
-            };
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification3);
 
             long timeoutSec = 5;
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            // Send a BatchedModifications message for the first transaction.
+            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
-            assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
-            assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
+                    expectMsgClass(duration, ReadyTransactionReply.class));
+            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -475,16 +481,15 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send BatchedModifications for the next 2 Tx's.
+            // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
             // processed after the first Tx completes.
@@ -577,16 +582,16 @@ public class ShardTest extends AbstractShardTest {
 
             assertEquals("Commits complete", true, done);
 
-            InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
-            inOrder.verify(mockCohort1.get()).canCommit();
-            inOrder.verify(mockCohort1.get()).preCommit();
-            inOrder.verify(mockCohort1.get()).commit();
-            inOrder.verify(mockCohort2.get()).canCommit();
-            inOrder.verify(mockCohort2.get()).preCommit();
-            inOrder.verify(mockCohort2.get()).commit();
-            inOrder.verify(mockCohort3.get()).canCommit();
-            inOrder.verify(mockCohort3.get()).preCommit();
-            inOrder.verify(mockCohort3.get()).commit();
+            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort1).commit();
+            inOrder.verify(cohort2).canCommit();
+            inOrder.verify(cohort2).preCommit();
+            inOrder.verify(cohort2).commit();
+            inOrder.verify(cohort3).canCommit();
+            inOrder.verify(cohort3).preCommit();
+            inOrder.verify(cohort3).commit();
 
             // Verify data in the data store.
 
@@ -664,7 +669,7 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -723,7 +728,7 @@ public class ShardTest extends AbstractShardTest {
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
                     containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Create a read Tx on the same chain.
 
@@ -805,14 +810,24 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            // Setup a simulated transactions with a mock cohort.
+
             String transactionID = "tx";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+                    TestModel.TEST_PATH, containerNode, modification);
+
             FiniteDuration duration = duration("5 seconds");
 
-            // Send a BatchedModifications to start a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -826,6 +841,11 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
+            InOrder inOrder = inOrder(cohort);
+            inOrder.verify(cohort).canCommit();
+            inOrder.verify(cohort).preCommit();
+            inOrder.verify(cohort).commit();
+
             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
 
@@ -859,7 +879,7 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
@@ -914,7 +934,7 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
@@ -933,7 +953,7 @@ public class ShardTest extends AbstractShardTest {
 
                 // Use MBean for verification
                 // Committed transaction count should increase as usual
-                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+                assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
 
                 // Commit index should advance as we do not have an empty modification
                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
@@ -953,40 +973,34 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            // Setup 2 mock cohorts. The first one fails in the commit phase.
+         // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+            // commit phase.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Send BatchedModifications to start and ready each transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -1039,27 +1053,19 @@ public class ShardTest extends AbstractShardTest {
             waitUntilLeader(shard);
 
             String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
 
-            // Send BatchedModifications to start and ready a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -1094,24 +1100,16 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
-            // Send BatchedModifications to start and ready a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -1155,9 +1153,14 @@ public class ShardTest extends AbstractShardTest {
                 }
             };
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                    modification, preCommit);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
@@ -1191,26 +1194,42 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-            // Create and ready the 1st Tx - will timeout
+            // Create 1st Tx - will timeout
 
             String transactionID1 = "tx1";
-            shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification1);
 
-            // Create and ready the 2nd Tx
+            // Create 2nd Tx
 
-            String transactionID2 = "tx2";
+            String transactionID2 = "tx3";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
-            shard.tell(newBatchedModifications(transactionID2, listNodePath,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+                    listNodePath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
+                    modification2);
+
+            // Ready the Tx's
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
@@ -1247,23 +1266,38 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
             String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
             String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
             String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
 
-            // Send a BatchedModifications to start transactions and ready them.
+            // Ready the Tx's
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx.
 
@@ -1308,37 +1342,30 @@ public class ShardTest extends AbstractShardTest {
 
             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            // Send BatchedModifications to start and ready each transaction.
-
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
-
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -1384,68 +1411,52 @@ public class ShardTest extends AbstractShardTest {
     @SuppressWarnings("serial")
     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
 
-        final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
-        class DelegatingPersistentDataProvider implements DataPersistenceProvider {
-            DataPersistenceProvider delegate;
-
-            DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
-                this.delegate = delegate;
-            }
+        final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
 
-            @Override
-            public boolean isRecoveryApplicable() {
-                return delegate.isRecoveryApplicable();
-            }
-
-            @Override
-            public <T> void persist(T o, Procedure<T> procedure) {
-                delegate.persist(o, procedure);
+        final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
+        class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
+            TestPersistentDataProvider(DataPersistenceProvider delegate) {
+                super(delegate);
             }
 
             @Override
             public void saveSnapshot(Object o) {
                 savedSnapshot.set(o);
-                delegate.saveSnapshot(o);
-            }
-
-            @Override
-            public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-                delegate.deleteSnapshots(criteria);
-            }
-
-            @Override
-            public void deleteMessages(long sequenceNumber) {
-                delegate.deleteMessages(sequenceNumber);
+                super.saveSnapshot(o);
             }
         }
 
         dataStoreContextBuilder.persistent(persistent);
 
         new ShardTestKit(getSystem()) {{
-            final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-            Creator<Shard> creator = new Creator<Shard>() {
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
+            class TestShard extends Shard {
 
-                        DelegatingPersistentDataProvider delegating;
+                protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
+                                    DatastoreContext datastoreContext, SchemaContext schemaContext) {
+                    super(name, peerAddresses, datastoreContext, schemaContext);
+                    setPersistence(new TestPersistentDataProvider(super.persistence()));
+                }
 
-                        @Override
-                        protected DataPersistenceProvider persistence() {
-                            if(delegating == null) {
-                                delegating = new DelegatingPersistentDataProvider(super.persistence());
-                            }
+                @Override
+                public void handleCommand(Object message) {
+                    super.handleCommand(message);
 
-                            return delegating;
-                        }
+                    if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+                        latch.get().countDown();
+                    }
+                }
 
-                        @Override
-                        protected void commitSnapshot(final long sequenceNumber) {
-                            super.commitSnapshot(sequenceNumber);
-                            latch.get().countDown();
-                        }
-                    };
+                @Override
+                public RaftActorContext getRaftActorContext() {
+                    return super.getRaftActorContext();
+                }
+            }
+
+            Creator<Shard> creator = new Creator<Shard>() {
+                @Override
+                public Shard create() throws Exception {
+                    return new TestShard(shardID, Collections.<String,String>emptyMap(),
+                            newDatastoreContext(), SCHEMA_CONTEXT);
                 }
             };
 
@@ -1458,8 +1469,9 @@ public class ShardTest extends AbstractShardTest {
 
             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
 
-            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
-            shard.tell(capture, getRef());
+            // Trigger creation of a snapshot by ensuring
+            RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
+            raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
@@ -1471,7 +1483,7 @@ public class ShardTest extends AbstractShardTest {
             latch.set(new CountDownLatch(1));
             savedSnapshot.set(null);
 
-            shard.tell(capture, getRef());
+            raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
@@ -1540,14 +1552,14 @@ public class ShardTest extends AbstractShardTest {
             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
                     persistentProps, "testPersistence1");
 
-            assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+            assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
 
             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
                     nonPersistentProps, "testPersistence2");
 
-            assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+            assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
 
             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
@@ -1563,19 +1575,19 @@ public class ShardTest extends AbstractShardTest {
             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             waitUntilLeader(shard);
 
             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", false,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};