BUG-7033: Implement batchHint in ShardDataTree 98/49498/4
authorTom Pantelis <tpanteli@brocade.com>
Fri, 16 Dec 2016 21:05:15 +0000 (16:05 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 29 Dec 2016 08:16:44 +0000 (08:16 +0000)
Modified ShardDataTree to properly compute the batchHint flag
that is passed to persistData. Basically if the next transaction
in the pendingCommits queue is in the COMMIT_PENDING state then
batchHint is set to true. After the call to persistData the next
transaction in the pendingCommits queue is processed which, if
COMMIT_PENDING, will be batched for replication with the previous
transaction.

Change-Id: I938e98391c69d617901e7e179c93e066667c017d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 23a5a0e20b7a584042956ebae566e82d3f9bde3e..90c47256b4c6b33bdf8e3b4f71eef7d88bb7fe90 100644 (file)
@@ -457,9 +457,9 @@ public class Shard extends RaftActor {
     }
 
     // applyState() will be invoked once consensus is reached on the payload
     }
 
     // applyState() will be invoked once consensus is reached on the payload
-    void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
+    void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) {
         // We are faking the sender
         // We are faking the sender
-        persistData(self(), transactionId, payload, false);
+        persistData(self(), transactionId, payload, batchHint);
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
index 064f6f5d8a476c6451ff9b4bd9bce0c4344549e0..9d65d993bbf5c2484ec66b417a558ac02ae413f9 100644 (file)
@@ -630,6 +630,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         maybeRunOperationOnPendingTransactionsComplete();
     }
 
         maybeRunOperationOnPendingTransactionsComplete();
     }
 
+    private boolean peekNextPendingCommit() {
+        final CommitEntry first = pendingCommits.peek();
+        return first != null && first.cohort.getState() == State.COMMIT_PENDING;
+    }
+
     private void processNextPending() {
         processNextPendingCommit();
         processNextPendingTransaction();
     private void processNextPending() {
         processNextPendingCommit();
         processNextPendingTransaction();
@@ -658,17 +663,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
         final SimpleShardDataTreeCohort current = entry.cohort;
         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
 
         final SimpleShardDataTreeCohort current = entry.cohort;
         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+
+        LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier());
+
         final DataTreeCandidateTip candidate;
         try {
             candidate = tip.prepare(cohort.getDataTreeModification());
         final DataTreeCandidateTip candidate;
         try {
             candidate = tip.prepare(cohort.getDataTreeModification());
-        } catch (Exception e) {
-            failPreCommit(e);
-            return;
-        }
-
-        try {
             cohort.userPreCommit(candidate);
             cohort.userPreCommit(candidate);
-        } catch (ExecutionException | TimeoutException e) {
+        } catch (ExecutionException | TimeoutException | RuntimeException e) {
             failPreCommit(e);
             return;
         }
             failPreCommit(e);
             return;
         }
@@ -680,6 +682,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
         pendingTransactions.remove();
         pendingCommits.add(entry);
 
         pendingTransactions.remove();
         pendingCommits.add(entry);
+
+        LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+
         cohort.successfulPreCommit(candidate);
 
         processNextPendingTransaction();
         cohort.successfulPreCommit(candidate);
 
         processNextPendingTransaction();
@@ -733,6 +738,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
             return;
         }
 
+        LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
+
         if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
             LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
             pendingCommits.remove();
         if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
             LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
             pendingCommits.remove();
@@ -745,22 +752,41 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final Payload payload;
         try {
             payload = CommitTransactionPayload.create(txId, candidate);
         final Payload payload;
         try {
             payload = CommitTransactionPayload.create(txId, candidate);
-
-            // Once completed, we will continue via payloadReplicationComplete
-            entry.lastAccess = shard.ticker().read();
-            shard.persistPayload(txId, payload);
-
-            LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
-
-            pendingCommits.remove();
-            pendingFinishCommits.add(entry);
         } catch (IOException e) {
             LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
             pendingCommits.poll().cohort.failedCommit(e);
         } catch (IOException e) {
             LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
             pendingCommits.poll().cohort.failedCommit(e);
+            processNextPending();
             return;
         }
 
             return;
         }
 
-        processNextPending();
+        // We process next transactions pending canCommit before we call persistPayload to possibly progress subsequent
+        // transactions to the COMMIT_PENDING state so the payloads can be batched for replication. This is done for
+        // single-shard transactions that immediately transition from canCommit to preCommit to commit. Note that
+        // if the next pending transaction is progressed to COMMIT_PENDING and this method (startCommit) is called,
+        // the next transaction will not attempt to replicate b/c the current transaction is still at the head of the
+        // pendingCommits queue.
+        processNextPendingTransaction();
+
+        // After processing next pending transactions, we can now remove the current transaction from pendingCommits.
+        // Note this must be done before the call to peekNextPendingCommit below so we check the next transaction
+        // in order to properly determine the batchHint flag for the call to persistPayload.
+        pendingCommits.remove();
+        pendingFinishCommits.add(entry);
+
+        // See if the next transaction is pending commit (ie in the COMMIT_PENDING state) so it can be batched with
+        // this transaction for replication.
+        boolean replicationBatchHint = peekNextPendingCommit();
+
+        // Once completed, we will continue via payloadReplicationComplete
+        shard.persistPayload(txId, payload, replicationBatchHint);
+
+        entry.lastAccess = shard.ticker().read();
+
+        LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
+
+        // Process the next transaction pending commit, if any. If there is one it will be batched with this
+        // transaction for replication.
+        processNextPendingCommit();
     }
 
     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
     }
 
     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
index d273910b6dea0498348c824d79798ed96f6ee7fc..6696df0781a5fcdd3dd42acb7a38722117b8261f 100644 (file)
@@ -87,6 +87,29 @@ public final class ShardDataTreeMocking {
         return callback;
     }
 
         return callback;
     }
 
+    public static FutureCallback<UnsignedLong> immediate3PhaseCommit(final ShardDataTreeCohort cohort) {
+        final FutureCallback<UnsignedLong> commitCallback = mockCallback();
+        doNothing().when(commitCallback).onSuccess(any(UnsignedLong.class));
+        doNothing().when(commitCallback).onFailure(any(Throwable.class));
+
+        final FutureCallback<DataTreeCandidate> preCommitCallback = mockCallback();
+        doAnswer(invocation -> {
+            cohort.commit(commitCallback);
+            return null;
+        }).when(preCommitCallback).onSuccess(any(DataTreeCandidate.class));
+        doNothing().when(preCommitCallback).onFailure(any(Throwable.class));
+
+        final FutureCallback<Void> canCommit = mockCallback();
+        doAnswer(invocation -> {
+            cohort.preCommit(preCommitCallback);
+            return null;
+        }).when(canCommit).onSuccess(null);
+        doNothing().when(canCommit).onFailure(any(Throwable.class));
+
+        cohort.canCommit(canCommit);
+        return commitCallback;
+    }
+
     @SuppressWarnings("unchecked")
     private static <T> Object invokeSuccess(final InvocationOnMock invocation, final T value) {
         invocation.getArgumentAt(0, FutureCallback.class).onSuccess(value);
     @SuppressWarnings("unchecked")
     private static <T> Object invokeSuccess(final InvocationOnMock invocation, final T value) {
         invocation.getArgumentAt(0, FutureCallback.class).onSuccess(value);
index 6b22426c76284c2a1aa5ba0270ee68599daf6093..6f0b32b4506ebce3f3886fe227cfc5afbdf231a9 100644 (file)
@@ -12,6 +12,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doNothing;
@@ -25,6 +26,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
@@ -203,7 +205,7 @@ public class ShardDataTreeTest extends AbstractTest {
     }
 
     @Test
     }
 
     @Test
-    public void testPipelinedTransactions() throws Exception {
+    public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception {
         doReturn(false).when(mockShard).canSkipPayload();
 
         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
         doReturn(false).when(mockShard).canSkipPayload();
 
         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
@@ -241,26 +243,30 @@ public class ShardDataTreeTest extends AbstractTest {
         verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
 
         final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
         verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
 
         final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
-        verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class));
+        verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
+                anyBoolean());
         verifyNoMoreInteractions(commitCallback2);
 
         final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
         verifyNoMoreInteractions(commitCallback2);
 
         final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
-        verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class));
+        verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
+                anyBoolean());
         verifyNoMoreInteractions(commitCallback4);
 
         final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
         InOrder inOrder = inOrder(mockShard);
         verifyNoMoreInteractions(commitCallback4);
 
         final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
         InOrder inOrder = inOrder(mockShard);
-        inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class));
-        inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class));
-        inOrder.verifyNoMoreInteractions();
+        inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
+                eq(true));
+        inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
+                eq(false));
         verifyNoMoreInteractions(commitCallback1);
         verifyNoMoreInteractions(commitCallback2);
 
         final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
         inOrder = inOrder(mockShard);
         verifyNoMoreInteractions(commitCallback1);
         verifyNoMoreInteractions(commitCallback2);
 
         final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
         inOrder = inOrder(mockShard);
-        inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class));
-        inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class));
-        inOrder.verifyNoMoreInteractions();
+        inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
+                eq(true));
+        inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
+                eq(false));
         verifyNoMoreInteractions(commitCallback3);
         verifyNoMoreInteractions(commitCallback4);
 
         verifyNoMoreInteractions(commitCallback3);
         verifyNoMoreInteractions(commitCallback4);
 
@@ -295,6 +301,51 @@ public class ShardDataTreeTest extends AbstractTest {
         assertEquals("People node", peopleNode, optional.get());
     }
 
         assertEquals("People node", peopleNode, optional.get());
     }
 
+    @Test
+    public void testPipelinedTransactionsWithImmediateCommits() throws Exception {
+        doReturn(false).when(mockShard).canSkipPayload();
+
+        final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
+            snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
+
+        final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
+            snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
+
+        YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+        MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
+        final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
+
+        final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
+        final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
+        final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
+
+        InOrder inOrder = inOrder(mockShard);
+        inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
+                eq(true));
+        inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
+                eq(true));
+        inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
+                eq(false));
+
+        // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
+        CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
+                cohort1.getCandidate());
+        shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
+        shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
+        shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
+
+        inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
+        inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
+        inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
+        inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
+
+        final DataTreeSnapshot snapshot =
+                shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
+        Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
+        assertEquals("Car node present", true, optional.isPresent());
+        assertEquals("Car node", carNode, optional.get());
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testAbortWithPendingCommits() throws Exception {
     @SuppressWarnings("unchecked")
     @Test
     public void testAbortWithPendingCommits() throws Exception {
@@ -333,10 +384,12 @@ public class ShardDataTreeTest extends AbstractTest {
         coordinatedCommit(cohort4);
 
         InOrder inOrder = inOrder(mockShard);
         coordinatedCommit(cohort4);
 
         InOrder inOrder = inOrder(mockShard);
-        inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class));
-        inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class));
-        inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class));
-        inOrder.verifyNoMoreInteractions();
+        inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
+                eq(false));
+        inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
+                eq(false));
+        inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
+                eq(false));
 
         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
 
         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
index 562a23765a165696e4b13517e83dd4fc57e4e216..b1e31380a4af6b0eae41e0471e27a60c1d36d722 100644 (file)
@@ -1520,14 +1520,15 @@ public class ShardTest extends AbstractShardTest {
             {
                 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
                     @Override
             {
                 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
                     @Override
-                    void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
+                    void persistPayload(final TransactionIdentifier transactionId, final Payload payload,
+                            boolean batchHint) {
                         // Simulate an AbortTransaction message occurring during
                         // replication, after
                         // persisting and before finishing the commit to the
                         // in-memory store.
 
                         doAbortTransaction(transactionId, null);
                         // Simulate an AbortTransaction message occurring during
                         // replication, after
                         // persisting and before finishing the commit to the
                         // in-memory store.
 
                         doAbortTransaction(transactionId, null);
-                        super.persistPayload(transactionId, payload);
+                        super.persistPayload(transactionId, payload, batchHint);
                     }
                 };
 
                     }
                 };