+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+ CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("getCanCommit", true, reply.getCanCommit());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testCanCommitPhaseFalseResponse() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitPhaseFalseResponse");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ String transactionID1 = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("getCanCommit", false, reply.getCanCommit());
+
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+ reply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("getCanCommit", true, reply.getCanCommit());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testImmediateCommitWithCanCommitPhaseFailure");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ String transactionID1 = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort, modification, true, true), getRef());
+
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+ DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+ DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+ doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+ doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(candidate).when(cohort).getCandidate();
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort, modification, true, true), getRef());
+
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testImmediateCommitWithCanCommitPhaseFalseResponse");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ String transactionID = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true, true), getRef());
+