- public void testCommitPhaseFailure() throws Throwable {
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitPhaseFailure");
-
- waitUntilLeader(shard);
-
- // Setup 2 simulated transactions with mock cohorts. The first one fails in the
- // commit phase.
-
- final String transactionID1 = "tx1";
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
- doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
-
- final String transactionID2 = "tx2";
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
- final FiniteDuration duration = duration("5 seconds");
- final Timeout timeout = new Timeout(duration);
-
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- // Send the CanCommitTransaction message for the first Tx.
-
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
- // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
- // processed after the first Tx completes.
-
- final Future<Object> canCommitFuture = Patterns.ask(shard,
- new CanCommitTransaction(transactionID2).toSerializable(), timeout);
-
- // Send the CommitTransaction message for the first Tx. This should send back an error
- // and trigger the 2nd Tx to proceed.
-
- shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, akka.actor.Status.Failure.class);
-
- // Wait for the 2nd Tx to complete the canCommit phase.
-
- final CountDownLatch latch = new CountDownLatch(1);
- canCommitFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable t, final Object resp) {
- latch.countDown();
- }
- }, getSystem().dispatcher());
-
- assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
-
- final InOrder inOrder = inOrder(cohort1, cohort2);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort1).commit();
- inOrder.verify(cohort2).canCommit();
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }};
- }
-
- @Test
- public void testPreCommitPhaseFailure() throws Throwable {
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testPreCommitPhaseFailure");
-
- waitUntilLeader(shard);
-
- final String transactionID1 = "tx1";
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
-
- final String transactionID2 = "tx2";
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
- final FiniteDuration duration = duration("5 seconds");
- final Timeout timeout = new Timeout(duration);
-
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true, false), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true, false), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- // Send the CanCommitTransaction message for the first Tx.
-
- shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
- // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
- // processed after the first Tx completes.
-
- final Future<Object> canCommitFuture = Patterns.ask(shard,
- new CanCommitTransaction(transactionID2).toSerializable(), timeout);
-
- // Send the CommitTransaction message for the first Tx. This should send back an error
- // and trigger the 2nd Tx to proceed.
-
- shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
- expectMsgClass(duration, akka.actor.Status.Failure.class);
-
- // Wait for the 2nd Tx to complete the canCommit phase.
-
- final CountDownLatch latch = new CountDownLatch(1);
- canCommitFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable t, final Object resp) {
- latch.countDown();
- }
- }, getSystem().dispatcher());
-
- assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
-
- final InOrder inOrder = inOrder(cohort1, cohort2);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort2).canCommit();
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }};