+ @Test
+ public void testBatchedModificationsOnTransactionChain() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testBatchedModificationsOnTransactionChain");
+
+ waitUntilLeader(shard);
+
+ final String transactionChainID = "txChain";
+ final String transactionID1 = "tx1";
+ final String transactionID2 = "tx2";
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Send a BatchedModifications to start a chained write transaction and ready it.
+
+ final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
+ containerNode, true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Create a read Tx on the same chain.
+
+ shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
+ transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
+
+ final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
+
+ getSystem().actorSelection(createReply.getTransactionPath()).tell(
+ new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
+ final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
+ assertEquals("Read node", containerNode, readReply.getNormalizedNode());
+
+ // Commit the write transaction.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ // Verify data in the data store.
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, path);
+ assertEquals("Stored node", containerNode, actualNode);
+ }};
+ }
+
+ @Test
+ public void testOnBatchedModificationsWhenNotLeader() {
+ final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
+ new ShardTestKit(getSystem()) {{
+ final Creator<Shard> creator = new Creator<Shard>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(newShardBuilder()) {
+ @Override
+ protected boolean isLeader() {
+ return overrideLeaderCalls.get() ? false : super.isLeader();
+ }
+
+ @Override
+ protected ActorSelection getLeader() {
+ return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
+ super.getLeader();
+ }
+ };
+ }
+ };
+
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
+
+ waitUntilLeader(shard);
+
+ overrideLeaderCalls.set(true);
+
+ final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
+
+ shard.tell(batched, ActorRef.noSender());
+
+ expectMsgEquals(batched);
+ }};
+ }
+
+ @Test
+ public void testTransactionMessagesWithNoLeader() {
+ new ShardTestKit(getSystem()) {{
+ dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
+ shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionMessagesWithNoLeader");
+
+ waitUntilNoLeader(shard);
+
+ shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
+ Failure failure = expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+
+ shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
+ DataStoreVersions.CURRENT_VERSION, true), getRef());
+ failure = expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+
+ shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
+ failure = expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+ }};
+ }
+
+ @Test
+ public void testReadyWithImmediateCommit() throws Exception{
+ testReadyWithImmediateCommit(true);
+ testReadyWithImmediateCommit(false);
+ }
+
+ private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testReadyWithImmediateCommit-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final String transactionID = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+ TestModel.TEST_PATH, containerNode, modification);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
+
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+ assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+ }};
+ }
+
+ @Test
+ public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testReadyLocalTransactionWithImmediateCommit");
+
+ waitUntilLeader(shard);
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final DataTreeModification modification = dataStore.newModification();
+
+ final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
+ final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+
+ final String txId = "tx1";
+ modification.ready();
+ final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+
+ shard.tell(readyMessage, getRef());
+
+ expectMsgClass(CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
+ }};
+ }
+
+ @Test
+ public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testReadyLocalTransactionWithThreePhaseCommit");
+
+ waitUntilLeader(shard);
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final DataTreeModification modification = dataStore.newModification();
+
+ final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
+ final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+
+ final String txId = "tx1";
+ modification.ready();
+ final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
+
+ shard.tell(readyMessage, getRef());
+
+ expectMsgClass(ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
+ }};
+ }
+
+ @Test
+ public void testCommitWithPersistenceDisabled() throws Throwable {
+ testCommitWithPersistenceDisabled(true);
+ testCommitWithPersistenceDisabled(false);
+ }
+
+ private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
+ dataStoreContextBuilder.persistent(false);
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWithPersistenceDisabled-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ // Setup a simulated transactions with a mock cohort.
+
+ final String transactionID = "tx";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+ TestModel.TEST_PATH, containerNode, modification);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+ assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+ }};
+ }
+
+ @Test
+ public void testCommitWhenTransactionHasNoModifications() {
+ testCommitWhenTransactionHasNoModifications(true);
+ testCommitWhenTransactionHasNoModifications(false);
+ }
+
+ private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
+ // Note that persistence is enabled which would normally result in the entry getting written to the journal
+ // but here that need not happen
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasNoModifications-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+ doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1,shardStats.getCommittedTransactionsCount());
+
+ // Commit index should not advance because this does not go into the journal
+ assertEquals(-1, shardStats.getCommitIndex());
+ }
+ };
+ }
+
+ @Test
+ public void testCommitWhenTransactionHasModifications() {
+ testCommitWhenTransactionHasModifications(true);
+ testCommitWhenTransactionHasModifications(false);
+ }
+
+ private void testCommitWhenTransactionHasModifications(final boolean readWrite){
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasModifications-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+ doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1, shardStats.getCommittedTransactionsCount());
+
+ // Commit index should advance as we do not have an empty modification
+ assertEquals(0, shardStats.getCommitIndex());
+ }
+ };
+ }
+
+ @Test
+ public void testCommitPhaseFailure() throws Throwable {
+ testCommitPhaseFailure(true);
+ testCommitPhaseFailure(false);
+ }
+
+ private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitPhaseFailure-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+ // commit phase.
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification1 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
+ doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+ doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
+
+ final String transactionID2 = "tx2";
+ final MutableCompositeModification modification2 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
+
+ final FiniteDuration duration = duration("5 seconds");
+ final Timeout timeout = new Timeout(duration);
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message for the first Tx.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
+ // processed after the first Tx completes.
+
+ final Future<Object> canCommitFuture = Patterns.ask(shard,
+ new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
+
+ // Send the CommitTransaction message for the first Tx. This should send back an error
+ // and trigger the 2nd Tx to proceed.
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Wait for the 2nd Tx to complete the canCommit phase.
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ canCommitFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable t, final Object resp) {
+ latch.countDown();
+ }
+ }, getSystem().dispatcher());
+
+ assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+
+ final InOrder inOrder = inOrder(cohort1, cohort2);
+ inOrder.verify(cohort1).canCommit();
+ inOrder.verify(cohort1).preCommit();
+ inOrder.verify(cohort1).commit();
+ inOrder.verify(cohort2).canCommit();
+ }};
+ }
+
+ @Test
+ public void testPreCommitPhaseFailure() throws Throwable {
+ testPreCommitPhaseFailure(true);
+ testPreCommitPhaseFailure(false);
+ }
+
+ private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testPreCommitPhaseFailure-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification1 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
+ doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
+
+ final String transactionID2 = "tx2";
+ final MutableCompositeModification modification2 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
+
+ final FiniteDuration duration = duration("5 seconds");
+ final Timeout timeout = new Timeout(duration);
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message for the first Tx.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
+ // processed after the first Tx completes.
+
+ final Future<Object> canCommitFuture = Patterns.ask(shard,
+ new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
+
+ // Send the CommitTransaction message for the first Tx. This should send back an error
+ // and trigger the 2nd Tx to proceed.
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Wait for the 2nd Tx to complete the canCommit phase.
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ canCommitFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable t, final Object resp) {
+ latch.countDown();
+ }
+ }, getSystem().dispatcher());
+
+ assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+
+ final InOrder inOrder = inOrder(cohort1, cohort2);
+ inOrder.verify(cohort1).canCommit();
+ inOrder.verify(cohort1).preCommit();
+ inOrder.verify(cohort2).canCommit();
+ }};
+ }
+
+ @Test
+ public void testCanCommitPhaseFailure() throws Throwable {
+ testCanCommitPhaseFailure(true);
+ testCanCommitPhaseFailure(false);
+ }
+
+ private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitPhaseFailure-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());