Convert all tests that use ForwardReadyTransaction to use BatchedModifications
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 6c517a464f5ed66b4304ed0d62fbbab7c35e9eff..25e37edf714cc8c6b2c6a6cf33ccc47b854c287f 100644 (file)
@@ -60,7 +60,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
@@ -550,8 +549,8 @@ public class ShardTest extends AbstractShardTest {
         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
 
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-            SerializationUtils.serializeNormalizedNode(root),
-            Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+                SerializationUtils.serializeNormalizedNode(root),
+                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
         return testStore;
     }
 
@@ -678,11 +677,7 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.class));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -694,14 +689,10 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send the ForwardedReadyTransaction for the next 2 Tx's.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
@@ -1132,11 +1123,16 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
+    public void testReadyWithImmediateCommit() throws Exception{
+        testReadyWithImmediateCommit(true);
+        testReadyWithImmediateCommit(false);
+    }
+
+    public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testForwardedReadyTransactionWithImmediateCommit");
+                    "testReadyWithImmediateCommit-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1150,11 +1146,7 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
 
             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
 
@@ -1250,11 +1242,16 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCommitWithPersistenceDisabled() throws Throwable {
+        testCommitWithPersistenceDisabled(true);
+        testCommitWithPersistenceDisabled(false);
+    }
+
+    public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.persistent(false);
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCommitWithPersistenceDisabled");
+                    "testCommitWithPersistenceDisabled-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1270,11 +1267,7 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1321,14 +1314,19 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testCommitWhenTransactionHasNoModifications(){
+    public void testCommitWhenTransactionHasNoModifications() {
+        testCommitWhenTransactionHasNoModifications(true);
+        testCommitWhenTransactionHasNoModifications(false);
+    }
+
+    public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
         // Note that persistence is enabled which would normally result in the entry getting written to the journal
         // but here that need not happen
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testCommitWhenTransactionHasNoModifications");
+                        "testCommitWhenTransactionHasNoModifications-" + readWrite);
 
                 waitUntilLeader(shard);
 
@@ -1342,11 +1340,7 @@ public class ShardTest extends AbstractShardTest {
 
                 final FiniteDuration duration = duration("5 seconds");
 
-                // Simulate the ForwardedReadyTransaction messages that would be sent
-                // by the ShardTransaction.
-
-                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                        cohort, modification, true, false), getRef());
+                shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
                 expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
@@ -1381,12 +1375,17 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testCommitWhenTransactionHasModifications(){
+    public void testCommitWhenTransactionHasModifications() {
+        testCommitWhenTransactionHasModifications(true);
+        testCommitWhenTransactionHasModifications(false);
+    }
+
+    public void testCommitWhenTransactionHasModifications(final boolean readWrite){
         new ShardTestKit(getSystem()) {
             {
                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testCommitWhenTransactionHasModifications");
+                        "testCommitWhenTransactionHasModifications-" + readWrite);
 
                 waitUntilLeader(shard);
 
@@ -1401,11 +1400,7 @@ public class ShardTest extends AbstractShardTest {
 
                 final FiniteDuration duration = duration("5 seconds");
 
-                // Simulate the ForwardedReadyTransaction messages that would be sent
-                // by the ShardTransaction.
-
-                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                        cohort, modification, true, false), getRef());
+                shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
                 expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
@@ -1441,10 +1436,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCommitPhaseFailure() throws Throwable {
+        testCommitPhaseFailure(true);
+        testCommitPhaseFailure(false);
+    }
+
+    public void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCommitPhaseFailure");
+                    "testCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1467,15 +1467,10 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1521,10 +1516,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testPreCommitPhaseFailure() throws Throwable {
+        testPreCommitPhaseFailure(true);
+        testPreCommitPhaseFailure(false);
+    }
+
+    public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testPreCommitPhaseFailure");
+                    "testPreCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1542,15 +1542,10 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1595,10 +1590,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCanCommitPhaseFailure() throws Throwable {
+        testCanCommitPhaseFailure(true);
+        testCanCommitPhaseFailure(false);
+    }
+
+    public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCanCommitPhaseFailure");
+                    "testCanCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1609,11 +1609,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1628,8 +1624,7 @@ public class ShardTest extends AbstractShardTest {
             final String transactionID2 = "tx2";
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1643,10 +1638,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testCanCommitPhaseFalseResponse() throws Throwable {
+        testCanCommitPhaseFalseResponse(true);
+        testCanCommitPhaseFalseResponse(false);
+    }
+
+    public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCanCommitPhaseFalseResponse");
+                    "testCanCommitPhaseFalseResponse-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1657,11 +1657,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1678,8 +1674,7 @@ public class ShardTest extends AbstractShardTest {
             final String transactionID2 = "tx2";
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1693,10 +1688,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
+        testImmediateCommitWithCanCommitPhaseFailure(true);
+        testImmediateCommitWithCanCommitPhaseFailure(false);
+    }
+
+    public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testImmediateCommitWithCanCommitPhaseFailure");
+                    "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1707,11 +1707,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
 
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
@@ -1729,8 +1725,7 @@ public class ShardTest extends AbstractShardTest {
             doReturn(candidateRoot).when(candidate).getRootNode();
             doReturn(candidate).when(cohort).getCandidate();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1740,10 +1735,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
+        testImmediateCommitWithCanCommitPhaseFalseResponse(true);
+        testImmediateCommitWithCanCommitPhaseFalseResponse(false);
+    }
+
+    public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testImmediateCommitWithCanCommitPhaseFalseResponse");
+                    "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1754,11 +1754,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
 
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
@@ -1776,8 +1772,7 @@ public class ShardTest extends AbstractShardTest {
             doReturn(candidateRoot).when(candidate).getRootNode();
             doReturn(candidate).when(cohort).getCandidate();
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort, modification, true, true), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
 
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1787,10 +1782,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testAbortBeforeFinishCommit() throws Throwable {
+        testAbortBeforeFinishCommit(true);
+        testAbortBeforeFinishCommit(false);
+    }
+
+    public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortBeforeFinishCommit");
+                    "testAbortBeforeFinishCommit-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1823,8 +1823,7 @@ public class ShardTest extends AbstractShardTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
@@ -1848,12 +1847,17 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testTransactionCommitTimeout() throws Throwable {
+        testTransactionCommitTimeout(true);
+        testTransactionCommitTimeout(false);
+    }
+
+    public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testTransactionCommitTimeout");
+                    "testTransactionCommitTimeout-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -1888,12 +1892,10 @@ public class ShardTest extends AbstractShardTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
@@ -1957,18 +1959,15 @@ public class ShardTest extends AbstractShardTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // The 3rd Tx should exceed queue capacity and fail.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // canCommit 1st Tx.
@@ -2009,8 +2008,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final String transactionID2 = "tx2";
@@ -2018,8 +2016,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final String transactionID3 = "tx3";
@@ -2027,8 +2024,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                cohort3, modification3, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
@@ -2061,8 +2057,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // CanCommit the first one so it's the current in-progress CohortEntry.
@@ -2077,8 +2072,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Ready the third Tx.
@@ -2125,10 +2119,15 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testAbortCurrentTransaction() throws Throwable {
+        testAbortCurrentTransaction(true);
+        testAbortCurrentTransaction(false);
+    }
+
+    public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortCurrentTransaction");
+                    "testAbortCurrentTransaction-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -2148,15 +2147,10 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -2192,6 +2186,11 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testAbortQueuedTransaction() throws Throwable {
+        testAbortQueuedTransaction(true);
+        testAbortQueuedTransaction(false);
+    }
+
+    public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
@@ -2215,7 +2214,7 @@ public class ShardTest extends AbstractShardTest {
 
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
-                            Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+                            Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
 
             waitUntilLeader(shard);
 
@@ -2229,8 +2228,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Ready the tx.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true, false), getRef());
+            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());