import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
- SerializationUtils.serializeNormalizedNode(root),
- Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+ SerializationUtils.serializeNormalizedNode(root),
+ Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
return testStore;
}
final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.class));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- // Send the ForwardedReadyTransaction for the next 2 Tx's.
-
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
}
@Test
- public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
+ public void testReadyWithImmediateCommit() throws Exception{
+ testReadyWithImmediateCommit(true);
+ testReadyWithImmediateCommit(false);
+ }
+
+ public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testForwardedReadyTransactionWithImmediateCommit");
+ "testReadyWithImmediateCommit-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
@Test
public void testCommitWithPersistenceDisabled() throws Throwable {
+ testCommitWithPersistenceDisabled(true);
+ testCommitWithPersistenceDisabled(false);
+ }
+
+ public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.persistent(false);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWithPersistenceDisabled");
+ "testCommitWithPersistenceDisabled-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
}
@Test
- public void testCommitWhenTransactionHasNoModifications(){
+ public void testCommitWhenTransactionHasNoModifications() {
+ testCommitWhenTransactionHasNoModifications(true);
+ testCommitWhenTransactionHasNoModifications(false);
+ }
+
+ public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
// Note that persistence is enabled which would normally result in the entry getting written to the journal
// but here that need not happen
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWhenTransactionHasNoModifications");
+ "testCommitWhenTransactionHasNoModifications-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
}
@Test
- public void testCommitWhenTransactionHasModifications(){
+ public void testCommitWhenTransactionHasModifications() {
+ testCommitWhenTransactionHasModifications(true);
+ testCommitWhenTransactionHasModifications(false);
+ }
+
+ public void testCommitWhenTransactionHasModifications(final boolean readWrite){
new ShardTestKit(getSystem()) {
{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWhenTransactionHasModifications");
+ "testCommitWhenTransactionHasModifications-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
@Test
public void testCommitPhaseFailure() throws Throwable {
+ testCommitPhaseFailure(true);
+ testCommitPhaseFailure(false);
+ }
+
+ public void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitPhaseFailure");
+ "testCommitPhaseFailure-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
@Test
public void testPreCommitPhaseFailure() throws Throwable {
+ testPreCommitPhaseFailure(true);
+ testPreCommitPhaseFailure(false);
+ }
+
+ public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testPreCommitPhaseFailure");
+ "testPreCommitPhaseFailure-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
@Test
public void testCanCommitPhaseFailure() throws Throwable {
+ testCanCommitPhaseFailure(true);
+ testCanCommitPhaseFailure(false);
+ }
+
+ public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCanCommitPhaseFailure");
+ "testCanCommitPhaseFailure-" + readWrite);
waitUntilLeader(shard);
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
final String transactionID2 = "tx2";
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@Test
public void testCanCommitPhaseFalseResponse() throws Throwable {
+ testCanCommitPhaseFalseResponse(true);
+ testCanCommitPhaseFalseResponse(false);
+ }
+
+ public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCanCommitPhaseFalseResponse");
+ "testCanCommitPhaseFalseResponse-" + readWrite);
waitUntilLeader(shard);
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
final String transactionID2 = "tx2";
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
@Test
public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
+ testImmediateCommitWithCanCommitPhaseFailure(true);
+ testImmediateCommitWithCanCommitPhaseFailure(false);
+ }
+
+ public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testImmediateCommitWithCanCommitPhaseFailure");
+ "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
waitUntilLeader(shard);
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort, modification, true, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
doReturn(candidateRoot).when(candidate).getRootNode();
doReturn(candidate).when(cohort).getCandidate();
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
@Test
public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
+ testImmediateCommitWithCanCommitPhaseFalseResponse(true);
+ testImmediateCommitWithCanCommitPhaseFalseResponse(false);
+ }
+
+ public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testImmediateCommitWithCanCommitPhaseFalseResponse");
+ "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
waitUntilLeader(shard);
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
doReturn(candidateRoot).when(candidate).getRootNode();
doReturn(candidate).when(cohort).getCandidate();
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort, modification, true, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
@Test
public void testAbortBeforeFinishCommit() throws Throwable {
+ testAbortBeforeFinishCommit(true);
+ testAbortBeforeFinishCommit(false);
+ }
+
+ public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortBeforeFinishCommit");
+ "testAbortBeforeFinishCommit-" + readWrite);
waitUntilLeader(shard);
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
modification, preCommit);
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
@Test
public void testTransactionCommitTimeout() throws Throwable {
+ testTransactionCommitTimeout(true);
+ testTransactionCommitTimeout(false);
+ }
+
+ public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testTransactionCommitTimeout");
+ "testTransactionCommitTimeout-" + readWrite);
waitUntilLeader(shard);
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// canCommit 1st Tx. We don't send the commit so it should timeout.
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// The 3rd Tx should exceed queue capacity and fail.
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
// canCommit 1st Tx.
final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final String transactionID2 = "tx2";
final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final String transactionID3 = "tx3";
final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// All Tx's are readied. We'll send canCommit for the last one but not the others. The others
final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// CanCommit the first one so it's the current in-progress CohortEntry.
final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Ready the third Tx.
@Test
public void testAbortCurrentTransaction() throws Throwable {
+ testAbortCurrentTransaction(true);
+ testAbortCurrentTransaction(false);
+ }
+
+ public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortCurrentTransaction");
+ "testAbortCurrentTransaction-" + readWrite);
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
@Test
public void testAbortQueuedTransaction() throws Throwable {
+ testAbortQueuedTransaction(true);
+ testAbortQueuedTransaction(false);
+ }
+
+ public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
Props.create(new DelegatingShardCreator(creator)).withDispatcher(
- Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+ Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
waitUntilLeader(shard);
// Ready the tx.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true, false), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());