From: Tom Pantelis Date: Tue, 24 Mar 2015 12:39:29 +0000 (+0000) Subject: Merge "BUG 2854 : Do not add empty read write transactions to the replicable journal" X-Git-Tag: release/lithium~360 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1e884647502a8d91f8a57bde8193c60b9bbcce0d;hp=919145b1bf7d68e436efa9b22c174965005a174a Merge "BUG 2854 : Do not add empty read write transactions to the replicable journal" --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 94be1b0dc1..ff0f4592cb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -374,9 +374,10 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().preCommit().get(); - // If we do not have any followers and we are not using persistence we can - // apply modification to the state immediately - if(!hasFollowers() && !persistence().isRecoveryApplicable()){ + // If we do not have any followers and we are not using persistence + // or if cohortEntry has no modifications + // we can apply modification to the state immediately + if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){ applyModificationToState(getSender(), transactionID, cohortEntry.getModification()); } else { Shard.this.persistData(getSender(), transactionID, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 951bc22545..5d0ca38d6a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -17,6 +17,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.modification.CompositeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.slf4j.Logger; @@ -265,5 +266,12 @@ public class ShardCommitCoordinator { void setShard(ActorRef shard) { this.shard = shard; } + + boolean hasModifications(){ + if(modification instanceof CompositeModification){ + return ((CompositeModification) modification).getModifications().size() > 0; + } + return true; + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index e10f566677..a87000136f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -53,6 +53,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; @@ -82,6 +83,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; @@ -652,6 +654,117 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testCommitWhenTransactionHasNoModifications(){ + // Note that persistence is enabled which would normally result in the entry getting written to the journal + // but here that need not happen + new ShardTestKit(getSystem()) { + { + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCommitWhenTransactionHasNoModifications"); + + waitUntilLeader(shard); + + String transactionID = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + + FiniteDuration duration = duration("5 seconds"); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + + // Send the CanCommitTransaction message. + + shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); + expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + + InOrder inOrder = inOrder(cohort); + inOrder.verify(cohort).canCommit(); + inOrder.verify(cohort).preCommit(); + inOrder.verify(cohort).commit(); + + // Use MBean for verification + // Committed transaction count should increase as usual + assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + + // Commit index should not advance because this does not go into the journal + assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + } + }; + } + + @Test + public void testCommitWhenTransactionHasModifications(){ + new ShardTestKit(getSystem()) { + { + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCommitWhenTransactionHasModifications"); + + waitUntilLeader(shard); + + String transactionID = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build())); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + + FiniteDuration duration = duration("5 seconds"); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + + // Send the CanCommitTransaction message. + + shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); + expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + + InOrder inOrder = inOrder(cohort); + inOrder.verify(cohort).canCommit(); + inOrder.verify(cohort).preCommit(); + inOrder.verify(cohort).commit(); + + // Use MBean for verification + // Committed transaction count should increase as usual + assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + + // Commit index should advance as we do not have an empty modification + assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + } + }; + } + @Test public void testCommitPhaseFailure() throws Throwable { new ShardTestKit(getSystem()) {{