BUG-7033: Remove payload replication short-circuits 71/49971/2
authorTom Pantelis <tpanteli@brocade.com>
Tue, 3 Jan 2017 12:53:08 +0000 (07:53 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 4 Jan 2017 12:12:16 +0000 (12:12 +0000)
Removed the code paths in ShardDataTree#startCommit whereby it
short-circuits the call to persistPayload. This simplifies the
code in ShardDataTree. For UNMODIFIED DataTreeCandidates we still
want to replicate so followers have an accurate view of transaction
states. For the case where persistence is disabled and there are no
followers, I modified persistPayload to check those conditions and
short-circuit.

Change-Id: Ifb940f950b076b5f9cedd91c42d3e065db92a7c5
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 90c4725..84c399d 100644 (file)
@@ -450,16 +450,15 @@ public class Shard extends RaftActor {
         updateConfigParams(datastoreContext.getShardRaftConfig());
     }
 
-    boolean canSkipPayload() {
-        // If we do not have any followers and we are not using persistence we can apply modification to the state
-        // immediately
-        return !hasFollowers() && !persistence().isRecoveryApplicable();
-    }
-
     // applyState() will be invoked once consensus is reached on the payload
     void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) {
-        // We are faking the sender
-        persistData(self(), transactionId, payload, batchHint);
+        boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+        if (canSkipPayload) {
+            applyState(self(), transactionId, payload);
+        } else {
+            // We are faking the sender
+            persistData(self(), transactionId, payload, batchHint);
+        }
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
index 78b49a6..2821912 100644 (file)
@@ -69,7 +69,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
@@ -597,7 +596,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     private void processNextPending() {
-        processNextPendingFinishCommit();
         processNextPendingCommit();
         processNextPendingTransaction();
     }
@@ -628,11 +626,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             entry -> startCommit(entry.cohort, entry.cohort.getCandidate()));
     }
 
-    private void processNextPendingFinishCommit() {
-        processNextPending(pendingFinishCommits, State.FINISH_COMMIT_PENDING,
-            entry -> payloadReplicationComplete(entry.cohort.getIdentifier()));
-    }
-
     private boolean peekNextPendingCommit() {
         final CommitEntry first = pendingCommits.peek();
         return first != null && first.cohort.getState() == State.COMMIT_PENDING;
@@ -739,15 +732,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
 
         final TransactionIdentifier txId = cohort.getIdentifier();
-        if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
-            LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
-            pendingCommits.remove();
-            pendingFinishCommits.add(entry);
-            cohort.finishCommitPending();
-            payloadReplicationComplete(txId);
-            return;
-        }
-
         final Payload payload;
         try {
             payload = CommitTransactionPayload.create(txId, candidate);
index 197c90a..8d947e8 100644 (file)
@@ -212,12 +212,6 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         switchState(State.FAILED).onFailure(cause);
     }
 
-    void finishCommitPending() {
-        checkState(State.COMMIT_PENDING);
-        // We want to switch the state but keep the callback.
-        callback = switchState(State.FINISH_COMMIT_PENDING);
-    }
-
     @Override
     public State getState() {
         return state;
index 6696df0..e2dc814 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.inOrder;
@@ -20,6 +21,9 @@ import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 import org.mockito.InOrder;
 import org.mockito.invocation.InvocationOnMock;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 public final class ShardDataTreeMocking {
@@ -188,4 +192,13 @@ public final class ShardDataTreeMocking {
         inOrder.verify(mock).preCommit(any(FutureCallback.class));
         inOrder.verify(mock).commit(any(FutureCallback.class));
     }
+
+    public static void immediatePayloadReplication(final ShardDataTree shardDataTree, final Shard mockShard) {
+        doAnswer(invocation -> {
+            shardDataTree.applyReplicatedPayload(invocation.getArgumentAt(0, TransactionIdentifier.class),
+                    invocation.getArgumentAt(1, Payload.class));
+            return null;
+        }).when(mockShard).persistPayload(any(TransactionIdentifier.class), any(CommitTransactionPayload.class),
+                anyBoolean());
+    }
 }
index 9c7442b..42d0337 100644 (file)
@@ -29,6 +29,7 @@ import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePayloadReplication;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
 
 import com.google.common.base.Optional;
@@ -73,7 +74,6 @@ public class ShardDataTreeTest extends AbstractTest {
 
     @Before
     public void setUp() {
-        doReturn(true).when(mockShard).canSkipPayload();
         doReturn(Ticker.systemTicker()).when(mockShard).ticker();
         doReturn(Mockito.mock(ShardStats.class)).when(mockShard).getShardMBean();
 
@@ -94,6 +94,7 @@ public class ShardDataTreeTest extends AbstractTest {
 
     private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent)
             throws ExecutionException, InterruptedException {
+        immediatePayloadReplication(shardDataTree, mockShard);
 
         assertEquals(fullSchema, shardDataTree.getSchemaContext());
 
@@ -134,6 +135,8 @@ public class ShardDataTreeTest extends AbstractTest {
 
     @Test
     public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException {
+        immediatePayloadReplication(shardDataTree, mockShard);
+
         final List<DataTreeCandidate> candidates = new ArrayList<>();
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
@@ -149,6 +152,8 @@ public class ShardDataTreeTest extends AbstractTest {
 
     @Test
     public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException {
+        immediatePayloadReplication(shardDataTree, mockShard);
+
         final List<DataTreeCandidate> candidates = new ArrayList<>();
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
@@ -166,6 +171,8 @@ public class ShardDataTreeTest extends AbstractTest {
 
     @Test
     public void testListenerNotifiedOnApplySnapshot() throws Exception {
+        immediatePayloadReplication(shardDataTree, mockShard);
+
         DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
         shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener);
 
@@ -184,6 +191,7 @@ public class ShardDataTreeTest extends AbstractTest {
         });
 
         ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
+        immediatePayloadReplication(newDataTree, mockShard);
         addCar(newDataTree, "optima");
         addCar(newDataTree, "murano");
 
@@ -206,8 +214,6 @@ public class ShardDataTreeTest extends AbstractTest {
 
     @Test
     public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception {
-        doReturn(false).when(mockShard).canSkipPayload();
-
         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
 
@@ -303,8 +309,6 @@ public class ShardDataTreeTest extends AbstractTest {
 
     @Test
     public void testPipelinedTransactionsWithImmediateCommits() throws Exception {
-        doReturn(false).when(mockShard).canSkipPayload();
-
         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
 
@@ -347,31 +351,27 @@ public class ShardDataTreeTest extends AbstractTest {
     }
 
     @Test
-    public void testPipelinedTransactionsWithUnmodifiedCandidate() throws Exception {
-        doReturn(false).when(mockShard).canSkipPayload();
+    public void testPipelinedTransactionsWithImmediateReplication() throws Exception {
+        immediatePayloadReplication(shardDataTree, mockShard);
 
         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
 
         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
-            snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
-
-        final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
+            snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
 
-        verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), eq(false));
+        YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+        MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
+        final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
 
+        final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
         final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
+        final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
 
-        verify(mockShard, never()).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
-                anyBoolean());
-
-        // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
-        shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(),
-                CommitTransactionPayload.create(nextTransactionId(), cohort1.getCandidate()));
-
-        InOrder inOrder = inOrder(commitCallback1, commitCallback2);
+        InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
+        inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
 
         final DataTreeSnapshot snapshot =
                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
@@ -382,8 +382,6 @@ public class ShardDataTreeTest extends AbstractTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testAbortWithPendingCommits() throws Exception {
-        doReturn(false).when(mockShard).canSkipPayload();
-
         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
 
@@ -441,6 +439,8 @@ public class ShardDataTreeTest extends AbstractTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testAbortWithFailedRebase() throws Exception {
+        immediatePayloadReplication(shardDataTree, mockShard);
+
         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
 
index b1e3138..ddd2cb0 100644 (file)
@@ -63,7 +63,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
@@ -76,7 +75,6 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeCh
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
@@ -1154,67 +1152,6 @@ public class ShardTest extends AbstractShardTest {
         };
     }
 
-    @Test
-    public void testReadWriteCommitWhenTransactionHasNoModifications() {
-        testCommitWhenTransactionHasNoModifications(true);
-    }
-
-    @Test
-    public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
-        testCommitWhenTransactionHasNoModifications(false);
-    }
-
-    private void testCommitWhenTransactionHasNoModifications(final boolean readWrite) {
-        // Note that persistence is enabled which would normally result in the
-        // entry getting written to the journal
-        // but here that need not happen
-        new ShardTestKit(getSystem()) {
-            {
-                final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testCommitWhenTransactionHasNoModifications-" + readWrite);
-
-                waitUntilLeader(shard);
-
-                final TransactionIdentifier transactionID = nextTransactionId();
-
-                final FiniteDuration duration = duration("5 seconds");
-
-                if (readWrite) {
-                    final ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore()
-                            .newReadWriteTransaction(transactionID);
-                    shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
-                } else {
-                    shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()),
-                            getRef());
-                }
-
-                expectMsgClass(duration, ReadyTransactionReply.class);
-
-                // Send the CanCommitTransaction message.
-
-                shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-                final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
-                        .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
-                assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
-                shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-                expectMsgClass(duration, CommitTransactionReply.class);
-
-                shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
-                final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
-
-                // Use MBean for verification
-                // Committed transaction count should increase as usual
-                assertEquals(1, shardStats.getCommittedTransactionsCount());
-
-                // Commit index should not advance because this does not go into
-                // the journal
-                assertEquals(-1, shardStats.getCommitIndex());
-            }
-        };
-    }
-
     @Test
     public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
         testCommitWhenTransactionHasModifications(true);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.