Merge changes I114cbac1,I45c2e7cd
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index e04c1a5d185ffbb5bc81b4d035a07545c02ef3a7..e3b82df1743e75c433cec193d54a2cbfbd696319 100644 (file)
@@ -17,6 +17,7 @@ import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
+import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -436,42 +437,42 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            final String transactionID1 = "tx1";
-            final String transactionID2 = "tx2";
-            final String transactionID3 = "tx3";
+         // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
-                    if(transactionID.equals(transactionID1)) {
-                        mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
-                        return mockCohort1.get();
-                    } else if(transactionID.equals(transactionID2)) {
-                        mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
-                        return mockCohort2.get();
-                    } else {
-                        mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
-                        return mockCohort3.get();
-                    }
-                }
-            };
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification3);
 
             long timeoutSec = 5;
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            // Send a BatchedModifications message for the first transaction.
+            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
-            assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
-            assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
+                    expectMsgClass(duration, ReadyTransactionReply.class));
+            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -480,16 +481,15 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send BatchedModifications for the next 2 Tx's.
+            // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
             // processed after the first Tx completes.
@@ -582,16 +582,16 @@ public class ShardTest extends AbstractShardTest {
 
             assertEquals("Commits complete", true, done);
 
-            InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
-            inOrder.verify(mockCohort1.get()).canCommit();
-            inOrder.verify(mockCohort1.get()).preCommit();
-            inOrder.verify(mockCohort1.get()).commit();
-            inOrder.verify(mockCohort2.get()).canCommit();
-            inOrder.verify(mockCohort2.get()).preCommit();
-            inOrder.verify(mockCohort2.get()).commit();
-            inOrder.verify(mockCohort3.get()).canCommit();
-            inOrder.verify(mockCohort3.get()).preCommit();
-            inOrder.verify(mockCohort3.get()).commit();
+            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort1).commit();
+            inOrder.verify(cohort2).canCommit();
+            inOrder.verify(cohort2).preCommit();
+            inOrder.verify(cohort2).commit();
+            inOrder.verify(cohort3).canCommit();
+            inOrder.verify(cohort3).preCommit();
+            inOrder.verify(cohort3).commit();
 
             // Verify data in the data store.
 
@@ -669,7 +669,7 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -728,7 +728,7 @@ public class ShardTest extends AbstractShardTest {
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
                     containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Create a read Tx on the same chain.
 
@@ -810,14 +810,24 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            // Setup a simulated transactions with a mock cohort.
+
             String transactionID = "tx";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+                    TestModel.TEST_PATH, containerNode, modification);
+
             FiniteDuration duration = duration("5 seconds");
 
-            // Send a BatchedModifications to start a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -831,6 +841,11 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
+            InOrder inOrder = inOrder(cohort);
+            inOrder.verify(cohort).canCommit();
+            inOrder.verify(cohort).preCommit();
+            inOrder.verify(cohort).commit();
+
             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
 
@@ -864,7 +879,7 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
@@ -919,7 +934,7 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
@@ -958,40 +973,34 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            // Setup 2 mock cohorts. The first one fails in the commit phase.
+         // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+            // commit phase.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Send BatchedModifications to start and ready each transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -1044,27 +1053,19 @@ public class ShardTest extends AbstractShardTest {
             waitUntilLeader(shard);
 
             String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
 
-            // Send BatchedModifications to start and ready a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -1099,24 +1100,16 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
-            // Send BatchedModifications to start and ready a transaction.
-
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -1160,9 +1153,14 @@ public class ShardTest extends AbstractShardTest {
                 }
             };
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                    modification, preCommit);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
@@ -1196,26 +1194,42 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-            // Create and ready the 1st Tx - will timeout
+            // Create 1st Tx - will timeout
 
             String transactionID1 = "tx1";
-            shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification1);
 
-            // Create and ready the 2nd Tx
+            // Create 2nd Tx
 
-            String transactionID2 = "tx2";
+            String transactionID2 = "tx3";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
-            shard.tell(newBatchedModifications(transactionID2, listNodePath,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+                    listNodePath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
+                    modification2);
+
+            // Ready the Tx's
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
@@ -1252,23 +1266,38 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
             String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
             String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
             String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
 
-            // Send a BatchedModifications to start transactions and ready them.
+            // Ready the Tx's
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx.
 
@@ -1313,37 +1342,30 @@ public class ShardTest extends AbstractShardTest {
 
             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
-            // Send BatchedModifications to start and ready each transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -1389,6 +1411,8 @@ public class ShardTest extends AbstractShardTest {
     @SuppressWarnings("serial")
     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
 
+        final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
+
         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
             TestPersistentDataProvider(DataPersistenceProvider delegate) {
@@ -1405,8 +1429,6 @@ public class ShardTest extends AbstractShardTest {
         dataStoreContextBuilder.persistent(persistent);
 
         new ShardTestKit(getSystem()) {{
-            final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-
             class TestShard extends Shard {
 
                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
@@ -1416,9 +1438,12 @@ public class ShardTest extends AbstractShardTest {
                 }
 
                 @Override
-                protected void commitSnapshot(final long sequenceNumber) {
-                    super.commitSnapshot(sequenceNumber);
-                    latch.get().countDown();
+                public void handleCommand(Object message) {
+                    super.handleCommand(message);
+
+                    if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+                        latch.get().countDown();
+                    }
                 }
 
                 @Override