From: Moiz Raja Date: Sun, 15 Nov 2015 20:23:13 +0000 (-0800) Subject: Convert all tests that use ForwardReadyTransaction to use BatchedModifications X-Git-Tag: release/beryllium~96 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1b244fc4c1f40bb663b6763c8deb25850dbbf245 Convert all tests that use ForwardReadyTransaction to use BatchedModifications The ShardTests that cover various scenarios were all using ForwardedReadyTransaction which I have now migrated to use BatchedModifications. This removes most direct usages of this message. Change-Id: I7b3eae972dfa3daf30d24795fc3f34a699f98eab Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 36aa9a2721..c03615cffd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -13,6 +13,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -35,6 +36,8 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.raft.TestActorFactory; @@ -245,6 +248,51 @@ public abstract class AbstractShardTest extends AbstractActorTest{ return cohort; } + protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort, + String transactionID, + MutableCompositeModification modification, + boolean doCommitOnReady) { + if(remoteReadWriteTransaction){ + return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true, doCommitOnReady); + } else { + setupCohortDecorator(shard, cohort); + return prepareBatchedModifications(transactionID, modification, doCommitOnReady); + } + } + + protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort, + String transactionID, + MutableCompositeModification modification) { + return prepareReadyTransactionMessage(remoteReadWriteTransaction, shard, cohort, transactionID, modification, false); + } + + protected void setupCohortDecorator(Shard shard, final ShardDataTreeCohort cohort) { + shard.getCommitCoordinator().setCohortDecorator(new ShardCommitCoordinator.CohortDecorator() { + @Override + public ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual) { + return cohort; + } + }); + } + + protected BatchedModifications prepareBatchedModifications(String transactionID, + MutableCompositeModification modification) { + return prepareBatchedModifications(transactionID, modification, false); + } + + private BatchedModifications prepareBatchedModifications(String transactionID, + MutableCompositeModification modification, + boolean doCommitOnReady) { + final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION, null); + batchedModifications.addModification(modification); + batchedModifications.setReady(true); + batchedModifications.setDoCommitOnReady(doCommitOnReady); + batchedModifications.setTotalMessagesSent(1); + return batchedModifications; + } + + public static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) throws ExecutionException, InterruptedException { return shard.underlyingActor().getDataStore().readNode(id).orNull(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 6c517a464f..25e37edf71 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -60,7 +60,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; @@ -550,8 +549,8 @@ public class ShardTest extends AbstractShardTest { final NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( - SerializationUtils.serializeNormalizedNode(root), - Collections.emptyList(), 0, 1, -1, -1)); + SerializationUtils.serializeNormalizedNode(root), + Collections.emptyList(), 0, 1, -1, -1)); return testStore; } @@ -678,11 +677,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.class)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); @@ -694,14 +689,10 @@ public class ShardTest extends AbstractShardTest { expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - // Send the ForwardedReadyTransaction for the next 2 Tx's. - - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and @@ -1132,11 +1123,16 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{ + public void testReadyWithImmediateCommit() throws Exception{ + testReadyWithImmediateCommit(true); + testReadyWithImmediateCommit(false); + } + + public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{ new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testForwardedReadyTransactionWithImmediateCommit"); + "testReadyWithImmediateCommit-" + readWrite); waitUntilLeader(shard); @@ -1150,11 +1146,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, true), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef()); expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); @@ -1250,11 +1242,16 @@ public class ShardTest extends AbstractShardTest { @Test public void testCommitWithPersistenceDisabled() throws Throwable { + testCommitWithPersistenceDisabled(true); + testCommitWithPersistenceDisabled(false); + } + + public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable { dataStoreContextBuilder.persistent(false); new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitWithPersistenceDisabled"); + "testCommitWithPersistenceDisabled-" + readWrite); waitUntilLeader(shard); @@ -1270,11 +1267,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1321,14 +1314,19 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testCommitWhenTransactionHasNoModifications(){ + public void testCommitWhenTransactionHasNoModifications() { + testCommitWhenTransactionHasNoModifications(true); + testCommitWhenTransactionHasNoModifications(false); + } + + public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){ // Note that persistence is enabled which would normally result in the entry getting written to the journal // but here that need not happen new ShardTestKit(getSystem()) { { final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitWhenTransactionHasNoModifications"); + "testCommitWhenTransactionHasNoModifications-" + readWrite); waitUntilLeader(shard); @@ -1342,11 +1340,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1381,12 +1375,17 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testCommitWhenTransactionHasModifications(){ + public void testCommitWhenTransactionHasModifications() { + testCommitWhenTransactionHasModifications(true); + testCommitWhenTransactionHasModifications(false); + } + + public void testCommitWhenTransactionHasModifications(final boolean readWrite){ new ShardTestKit(getSystem()) { { final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitWhenTransactionHasModifications"); + "testCommitWhenTransactionHasModifications-" + readWrite); waitUntilLeader(shard); @@ -1401,11 +1400,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1441,10 +1436,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testCommitPhaseFailure() throws Throwable { + testCommitPhaseFailure(true); + testCommitPhaseFailure(false); + } + + public void testCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitPhaseFailure"); + "testCommitPhaseFailure-" + readWrite); waitUntilLeader(shard); @@ -1467,15 +1467,10 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1521,10 +1516,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testPreCommitPhaseFailure() throws Throwable { + testPreCommitPhaseFailure(true); + testPreCommitPhaseFailure(false); + } + + public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testPreCommitPhaseFailure"); + "testPreCommitPhaseFailure-" + readWrite); waitUntilLeader(shard); @@ -1542,15 +1542,10 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1595,10 +1590,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testCanCommitPhaseFailure() throws Throwable { + testCanCommitPhaseFailure(true); + testCanCommitPhaseFailure(false); + } + + public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCanCommitPhaseFailure"); + "testCanCommitPhaseFailure-" + readWrite); waitUntilLeader(shard); @@ -1609,11 +1609,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1628,8 +1624,7 @@ public class ShardTest extends AbstractShardTest { final String transactionID2 = "tx2"; doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); @@ -1643,10 +1638,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testCanCommitPhaseFalseResponse() throws Throwable { + testCanCommitPhaseFalseResponse(true); + testCanCommitPhaseFalseResponse(false); + } + + public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCanCommitPhaseFalseResponse"); + "testCanCommitPhaseFalseResponse-" + readWrite); waitUntilLeader(shard); @@ -1657,11 +1657,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1678,8 +1674,7 @@ public class ShardTest extends AbstractShardTest { final String transactionID2 = "tx2"; doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); @@ -1693,10 +1688,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable { + testImmediateCommitWithCanCommitPhaseFailure(true); + testImmediateCommitWithCanCommitPhaseFailure(false); + } + + public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testImmediateCommitWithCanCommitPhaseFailure"); + "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite); waitUntilLeader(shard); @@ -1707,11 +1707,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort, modification, true, true), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); @@ -1729,8 +1725,7 @@ public class ShardTest extends AbstractShardTest { doReturn(candidateRoot).when(candidate).getRootNode(); doReturn(candidate).when(cohort).getCandidate(); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort, modification, true, true), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); @@ -1740,10 +1735,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable { + testImmediateCommitWithCanCommitPhaseFalseResponse(true); + testImmediateCommitWithCanCommitPhaseFalseResponse(false); + } + + public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testImmediateCommitWithCanCommitPhaseFalseResponse"); + "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite); waitUntilLeader(shard); @@ -1754,11 +1754,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, true), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); @@ -1776,8 +1772,7 @@ public class ShardTest extends AbstractShardTest { doReturn(candidateRoot).when(candidate).getRootNode(); doReturn(candidate).when(cohort).getCandidate(); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort, modification, true, true), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); @@ -1787,10 +1782,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testAbortBeforeFinishCommit() throws Throwable { + testAbortBeforeFinishCommit(true); + testAbortBeforeFinishCommit(false); + } + + public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testAbortBeforeFinishCommit"); + "testAbortBeforeFinishCommit-" + readWrite); waitUntilLeader(shard); @@ -1823,8 +1823,7 @@ public class ShardTest extends AbstractShardTest { TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification, preCommit); - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); @@ -1848,12 +1847,17 @@ public class ShardTest extends AbstractShardTest { @Test public void testTransactionCommitTimeout() throws Throwable { + testTransactionCommitTimeout(true); + testTransactionCommitTimeout(false); + } + + public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testTransactionCommitTimeout"); + "testTransactionCommitTimeout-" + readWrite); waitUntilLeader(shard); @@ -1888,12 +1892,10 @@ public class ShardTest extends AbstractShardTest { // Ready the Tx's - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1957,18 +1959,15 @@ public class ShardTest extends AbstractShardTest { // Ready the Tx's - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // The 3rd Tx should exceed queue capacity and fail. - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); // canCommit 1st Tx. @@ -2009,8 +2008,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); final String transactionID2 = "tx2"; @@ -2018,8 +2016,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); final String transactionID3 = "tx3"; @@ -2027,8 +2024,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3); - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // All Tx's are readied. We'll send canCommit for the last one but not the others. The others @@ -2061,8 +2057,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // CanCommit the first one so it's the current in-progress CohortEntry. @@ -2077,8 +2072,7 @@ public class ShardTest extends AbstractShardTest { final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Ready the third Tx. @@ -2125,10 +2119,15 @@ public class ShardTest extends AbstractShardTest { @Test public void testAbortCurrentTransaction() throws Throwable { + testAbortCurrentTransaction(true); + testAbortCurrentTransaction(false); + } + + public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testAbortCurrentTransaction"); + "testAbortCurrentTransaction-" + readWrite); waitUntilLeader(shard); @@ -2148,15 +2147,10 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -2192,6 +2186,11 @@ public class ShardTest extends AbstractShardTest { @Test public void testAbortQueuedTransaction() throws Throwable { + testAbortQueuedTransaction(true); + testAbortQueuedTransaction(false); + } + + public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) {{ final AtomicReference cleaupCheckLatch = new AtomicReference<>(); @@ -2215,7 +2214,7 @@ public class ShardTest extends AbstractShardTest { final TestActorRef shard = TestActorRef.create(getSystem(), Props.create(new DelegatingShardCreator(creator)).withDispatcher( - Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction"); + Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite); waitUntilLeader(shard); @@ -2229,8 +2228,7 @@ public class ShardTest extends AbstractShardTest { // Ready the tx. - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true, false), getRef()); + shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());