+ final String transactionID = "tx";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+ TestModel.TEST_PATH, containerNode, modification);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+ assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+ }};
+ }
+
+ @Test
+ public void testReadWriteCommitWhenTransactionHasNoModifications() {
+ testCommitWhenTransactionHasNoModifications(true);
+ }
+
+ @Test
+ public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
+ testCommitWhenTransactionHasNoModifications(false);
+ }
+
+ private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
+ // Note that persistence is enabled which would normally result in the entry getting written to the journal
+ // but here that need not happen
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasNoModifications-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+ doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1,shardStats.getCommittedTransactionsCount());
+
+ // Commit index should not advance because this does not go into the journal
+ assertEquals(-1, shardStats.getCommitIndex());
+ }
+ };
+ }
+
+ @Test
+ public void testReadWriteCommitWhenTransactionHasModifications() {
+ testCommitWhenTransactionHasModifications(true);
+ }
+
+ @Test
+ public void testWriteOnlyCommitWhenTransactionHasModifications() {
+ testCommitWhenTransactionHasModifications(false);
+ }
+
+ private void testCommitWhenTransactionHasModifications(final boolean readWrite){
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasModifications-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx1";
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+ doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1, shardStats.getCommittedTransactionsCount());
+
+ // Commit index should advance as we do not have an empty modification
+ assertEquals(0, shardStats.getCommitIndex());
+ }
+ };
+ }
+
+ @Test
+ public void testCommitPhaseFailure() throws Throwable {
+ testCommitPhaseFailure(true);
+ testCommitPhaseFailure(false);
+ }
+
+ private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitPhaseFailure-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+ // commit phase.
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification1 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
+ doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
+ doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
+
+ final String transactionID2 = "tx2";
+ final MutableCompositeModification modification2 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");