BUG 2854 : Do not add empty read write transactions to the replicable journal 26/16726/1
authorMoiz Raja <moraja@cisco.com>
Tue, 17 Mar 2015 01:17:12 +0000 (18:17 -0700)
committerMoiz Raja <moraja@cisco.com>
Tue, 17 Mar 2015 23:32:32 +0000 (16:32 -0700)
Change-Id: I7af5689975de64331fd9739d4ae682fdff1940ee
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 94be1b0dc1db9bec4211a801e98d6ed568576b32..ff0f4592cba972261660c5ea1db2c3e415a02608 100644 (file)
@@ -374,9 +374,10 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().preCommit().get();
 
-            // If we do not have any followers and we are not using persistence we can
-            // apply modification to the state immediately
-            if(!hasFollowers() && !persistence().isRecoveryApplicable()){
+            // If we do not have any followers and we are not using persistence
+            // or if cohortEntry has no modifications
+            // we can apply modification to the state immediately
+            if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
                 applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
             } else {
                 Shard.this.persistData(getSender(), transactionID,
index 951bc22545804af11bbc37fd8ef28d03cd53fc02..5d0ca38d6a2f1a1398161c7e73246a47f7b216f6 100644 (file)
@@ -17,6 +17,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
@@ -265,5 +266,12 @@ public class ShardCommitCoordinator {
         void setShard(ActorRef shard) {
             this.shard = shard;
         }
+
+        boolean hasModifications(){
+            if(modification instanceof CompositeModification){
+                return ((CompositeModification) modification).getModifications().size() > 0;
+            }
+            return true;
+        }
     }
 }
index f38b510947a271e0c89e5a5881d67b0309ce866d..00e7a56c01106e1c2712a5a8d44cd29abc32e1ab 100644 (file)
@@ -53,6 +53,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
@@ -82,6 +83,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
@@ -652,6 +654,117 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testCommitWhenTransactionHasNoModifications(){
+        // Note that persistence is enabled which would normally result in the entry getting written to the journal
+        // but here that need not happen
+        new ShardTestKit(getSystem()) {
+            {
+                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        "testCommitWhenTransactionHasNoModifications");
+
+                waitUntilLeader(shard);
+
+                String transactionID = "tx1";
+                MutableCompositeModification modification = new MutableCompositeModification();
+                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+
+                FiniteDuration duration = duration("5 seconds");
+
+                // Simulate the ForwardedReadyTransaction messages that would be sent
+                // by the ShardTransaction.
+
+                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                        cohort, modification, true), getRef());
+                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+                // Send the CanCommitTransaction message.
+
+                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+                InOrder inOrder = inOrder(cohort);
+                inOrder.verify(cohort).canCommit();
+                inOrder.verify(cohort).preCommit();
+                inOrder.verify(cohort).commit();
+
+                // Use MBean for verification
+                // Committed transaction count should increase as usual
+                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+
+                // Commit index should not advance because this does not go into the journal
+                assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
+
+                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            }
+        };
+    }
+
+    @Test
+    public void testCommitWhenTransactionHasModifications(){
+        new ShardTestKit(getSystem()) {
+            {
+                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        "testCommitWhenTransactionHasModifications");
+
+                waitUntilLeader(shard);
+
+                String transactionID = "tx1";
+                MutableCompositeModification modification = new MutableCompositeModification();
+                modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
+                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+
+                FiniteDuration duration = duration("5 seconds");
+
+                // Simulate the ForwardedReadyTransaction messages that would be sent
+                // by the ShardTransaction.
+
+                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                        cohort, modification, true), getRef());
+                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+                // Send the CanCommitTransaction message.
+
+                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+                CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+                assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+                InOrder inOrder = inOrder(cohort);
+                inOrder.verify(cohort).canCommit();
+                inOrder.verify(cohort).preCommit();
+                inOrder.verify(cohort).commit();
+
+                // Use MBean for verification
+                // Committed transaction count should increase as usual
+                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+
+                // Commit index should advance as we do not have an empty modification
+                assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
+
+                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+            }
+        };
+    }
+
     @Test
     public void testCommitPhaseFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{