+ final String txId = "tx1";
+ modification.ready();
+ final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
+
+ shard.tell(readyMessage, getRef());
+
+ expectMsgClass(ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
+ }};
+ }
+
+ @Test
+ public void testCommitWithPersistenceDisabled() throws Throwable {
+ testCommitWithPersistenceDisabled(true);
+ testCommitWithPersistenceDisabled(false);
+ }
+
+ private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
+ dataStoreContextBuilder.persistent(false);
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWithPersistenceDisabled-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ // Setup a simulated transactions with a mock cohort.
+
+ final String transactionID = "tx";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+ TestModel.TEST_PATH, containerNode, modification);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+ assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+ }};
+ }
+
+ @Test
+ public void testCommitWhenTransactionHasNoModifications() {
+ testCommitWhenTransactionHasNoModifications(true);
+ testCommitWhenTransactionHasNoModifications(false);
+ }
+
+ private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
+ // Note that persistence is enabled which would normally result in the entry getting written to the journal
+ // but here that need not happen
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasNoModifications-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+ doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1,shardStats.getCommittedTransactionsCount());
+
+ // Commit index should not advance because this does not go into the journal
+ assertEquals(-1, shardStats.getCommitIndex());
+ }
+ };
+ }
+
+ @Test
+ public void testCommitWhenTransactionHasModifications() {
+ testCommitWhenTransactionHasModifications(true);
+ testCommitWhenTransactionHasModifications(false);
+ }
+
+ private void testCommitWhenTransactionHasModifications(final boolean readWrite){
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasModifications-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+ doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1, shardStats.getCommittedTransactionsCount());
+
+ // Commit index should advance as we do not have an empty modification
+ assertEquals(0, shardStats.getCommitIndex());
+ }
+ };
+ }
+
+ @Test
+ public void testCommitPhaseFailure() throws Throwable {
+ testCommitPhaseFailure(true);
+ testCommitPhaseFailure(false);
+ }
+
+ private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitPhaseFailure-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+ // commit phase.
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification1 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
+ doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+ doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
+
+ final String transactionID2 = "tx2";
+ final MutableCompositeModification modification2 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
+
+ final FiniteDuration duration = duration("5 seconds");
+ final Timeout timeout = new Timeout(duration);
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message for the first Tx.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
+ // processed after the first Tx completes.
+
+ final Future<Object> canCommitFuture = Patterns.ask(shard,
+ new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
+
+ // Send the CommitTransaction message for the first Tx. This should send back an error
+ // and trigger the 2nd Tx to proceed.
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Wait for the 2nd Tx to complete the canCommit phase.
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ canCommitFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable t, final Object resp) {
+ latch.countDown();
+ }
+ }, getSystem().dispatcher());
+
+ assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+
+ final InOrder inOrder = inOrder(cohort1, cohort2);
+ inOrder.verify(cohort1).canCommit();
+ inOrder.verify(cohort1).preCommit();
+ inOrder.verify(cohort1).commit();
+ inOrder.verify(cohort2).canCommit();
+ }};
+ }
+
+ @Test
+ public void testPreCommitPhaseFailure() throws Throwable {
+ testPreCommitPhaseFailure(true);
+ testPreCommitPhaseFailure(false);
+ }
+
+ private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testPreCommitPhaseFailure-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification1 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
+ doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
+
+ final String transactionID2 = "tx2";
+ final MutableCompositeModification modification2 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
+
+ final FiniteDuration duration = duration("5 seconds");
+ final Timeout timeout = new Timeout(duration);
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message for the first Tx.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
+ // processed after the first Tx completes.
+
+ final Future<Object> canCommitFuture = Patterns.ask(shard,
+ new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
+
+ // Send the CommitTransaction message for the first Tx. This should send back an error
+ // and trigger the 2nd Tx to proceed.
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Wait for the 2nd Tx to complete the canCommit phase.
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ canCommitFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable t, final Object resp) {
+ latch.countDown();
+ }
+ }, getSystem().dispatcher());
+
+ assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+
+ final InOrder inOrder = inOrder(cohort1, cohort2);
+ inOrder.verify(cohort1).canCommit();
+ inOrder.verify(cohort1).preCommit();
+ inOrder.verify(cohort2).canCommit();
+ }};
+ }
+
+ @Test
+ public void testCanCommitPhaseFailure() throws Throwable {
+ testCanCommitPhaseFailure(true);
+ testCanCommitPhaseFailure(false);
+ }
+
+ private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitPhaseFailure-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ final String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.class));
+ assertEquals("getCanCommit", true, reply.getCanCommit());
+ }};
+ }
+
+ @Test
+ public void testCanCommitPhaseFalseResponse() throws Throwable {
+ testCanCommitPhaseFalseResponse(true);
+ testCanCommitPhaseFalseResponse(false);
+ }
+
+ private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitPhaseFalseResponse-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.class));
+ assertEquals("getCanCommit", false, reply.getCanCommit());
+
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ final String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ reply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.class));
+ assertEquals("getCanCommit", true, reply.getCanCommit());
+ }};
+ }
+
+ @Test
+ public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
+ testImmediateCommitWithCanCommitPhaseFailure(true);
+ testImmediateCommitWithCanCommitPhaseFailure(false);
+ }
+
+ private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
+
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ final String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+ final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+ final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+ doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+ doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(candidate).when(cohort).getCandidate();
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
+
+ expectMsgClass(duration, CommitTransactionReply.class);
+ }};
+ }
+
+ @Test
+ public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
+ testImmediateCommitWithCanCommitPhaseFalseResponse(true);
+ testImmediateCommitWithCanCommitPhaseFalseResponse(false);
+ }
+
+ private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final String transactionID = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
+
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ final String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+ final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+ final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+ doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+ doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(candidate).when(cohort).getCandidate();
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
+
+ expectMsgClass(duration, CommitTransactionReply.class);
+ }};
+ }
+
+ @Test
+ public void testAbortBeforeFinishCommit() throws Throwable {
+ testAbortBeforeFinishCommit(true);
+ testAbortBeforeFinishCommit(false);
+ }
+
+ private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testAbortBeforeFinishCommit-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final String transactionID = "tx1";
+ final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
+ new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
+ @Override
+ public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
+ final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
+
+ // Simulate an AbortTransaction message occurring during replication, after
+ // persisting and before finishing the commit to the in-memory store.
+ // We have no followers so due to optimizations in the RaftActor, it does not
+ // attempt replication and thus we can't send an AbortTransaction message b/c
+ // it would be processed too late after CommitTransaction completes. So we'll
+ // simulate an AbortTransaction message occurring during replication by calling
+ // the shard directly.
+ //
+ shard.underlyingActor().doAbortTransaction(transactionID, null);
+
+ return preCommitFuture;
+ }
+ };
+
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+ modification, preCommit);
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
+
+ // Since we're simulating an abort occurring during replication and before finish commit,
+ // the data should still get written to the in-memory store since we've gotten past
+ // canCommit and preCommit and persisted the data.
+ assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
+ }};
+ }
+
+ @Test
+ public void testTransactionCommitTimeout() throws Throwable {
+ testTransactionCommitTimeout(true);
+ testTransactionCommitTimeout(false);
+ }
+
+ private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitTimeout-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeToStore(shard, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+ // Create 1st Tx - will timeout
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification1 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+ modification1);
+
+ // Create 2nd Tx
+
+ final String transactionID2 = "tx3";
+ final MutableCompositeModification modification2 = new MutableCompositeModification();
+ final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+ final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+ listNodePath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
+ modification2);
+
+ // Ready the Tx's
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // canCommit 1st Tx. We don't send the commit so it should timeout.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
+
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ // Try to commit the 1st Tx - should fail as it's not the current Tx.
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Commit the 2nd Tx.
+
+ shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
+ assertNotNull(listNodePath + " not found", node);
+ }};
+ }
+
+ @Test
+ public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
+ dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
+
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitQueueCapacityExceeded");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification1 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+ final String transactionID2 = "tx2";
+ final MutableCompositeModification modification2 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,