BUG-7033: Allow ShardDataTree to pipeline transactions 75/28775/21
authorRobert Varga <rovarga@cisco.com>
Sat, 24 Oct 2015 16:34:53 +0000 (18:34 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 15 Dec 2016 18:53:45 +0000 (18:53 +0000)
InMemoryDataTree gives us the DataTreeTip, which allows another
DataTreeModificatoin to be prepared on top an uncommitted one. This
allows us to pipeline transactions if we manage our lifecycle properly.

For now this feature gives stricter abort validation, and has otherwise
no impact. In future, this allows another DataTreeModification to be
prepared and queue for persistence before replication of the previous
one finishes.

Change-Id: I7ab97c906a6403da780800edd335f74c403e5aa4
Signed-off-by: Robert Varga <rovarga@cisco.com>
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java

index bc9549a..a04bf62 100644 (file)
@@ -64,9 +64,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -108,6 +110,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private final Shard shard;
     private Runnable runOnPendingTransactionsComplete;
 
+    /**
+     * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a
+     * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
+     * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current
+     * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself.
+     */
+    private TipProducingDataTreeTip tip;
+
     private SchemaContext schemaContext;
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
@@ -122,6 +132,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
         this.logContext = Preconditions.checkNotNull(logContext);
         this.metadata = ImmutableList.copyOf(metadata);
+        tip = dataTree;
     }
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
@@ -505,6 +516,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     @Deprecated
     public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
+        // Direct modification commit is a utility, which cannot be used while we have transactions in-flight
+        Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending");
+
         modification.ready();
         dataTree.validate(modification);
         DataTreeCandidate candidate = dataTree.prepare(modification);
@@ -519,6 +533,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
 
         pendingTransactions.clear();
+        tip = dataTree;
         return ret;
     }
 
@@ -536,7 +551,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
-                dataTree.validate(modification);
+                tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
                 entry.lastAccess = shard.ticker().read();
@@ -591,7 +606,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
         final DataTreeCandidateTip candidate;
         try {
-            candidate = dataTree.prepare(cohort.getDataTreeModification());
+            candidate = tip.prepare(cohort.getDataTreeModification());
         } catch (Exception e) {
             failPreCommit(e);
             return;
@@ -604,6 +619,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        // Set the tip of the data tree.
+        tip = Verify.verifyNotNull(candidate);
+
         entry.lastAccess = shard.ticker().read();
         cohort.successfulPreCommit(candidate);
     }
@@ -629,6 +647,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        // All pending candidates have been committed, reset the tip to the data tree
+        if (tip == candidate) {
+            tip = dataTree;
+        }
+
         shard.getShardMBean().incrementCommittedTransactionCount();
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
@@ -745,39 +768,77 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    void startAbort(final SimpleShardDataTreeCohort cohort) {
+    boolean startAbort(final SimpleShardDataTreeCohort cohort) {
         final Iterator<CommitEntry> it = pendingTransactions.iterator();
         if (!it.hasNext()) {
             LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
-            return;
+            return true;
         }
 
         // First entry is special, as it may already be committing
         final CommitEntry first = it.next();
         if (cohort.equals(first.cohort)) {
             if (cohort.getState() != State.COMMIT_PENDING) {
-                LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+                LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
                     cohort.getIdentifier());
 
-                pendingTransactions.remove();
+                it.remove();
+                rebasePreCommittedTransactions(it, dataTree);
                 processNextTransaction();
-            } else {
-                LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+                return true;
             }
 
-            return;
+            LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+            return false;
         }
 
+        TipProducingDataTreeTip newTip = dataTree;
         while (it.hasNext()) {
             final CommitEntry e = it.next();
             if (cohort.equals(e.cohort)) {
                 LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
                 it.remove();
-                return;
+                rebasePreCommittedTransactions(it, newTip);
+                return true;
+            } else {
+                newTip = cohort.getCandidate();
             }
         }
 
         LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+        return true;
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void rebasePreCommittedTransactions(Iterator<CommitEntry> iter, TipProducingDataTreeTip newTip) {
+        tip = newTip;
+        while (iter.hasNext()) {
+            final SimpleShardDataTreeCohort cohort = iter.next().cohort;
+            if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
+                LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+
+                try {
+                    tip.validate(cohort.getDataTreeModification());
+                } catch (DataValidationFailedException | RuntimeException e) {
+                    LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+                    cohort.reportFailure(e);
+                }
+            } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
+                LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+
+                try {
+                    tip.validate(cohort.getDataTreeModification());
+                    DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
+                    cohort.userPreCommit(candidate);
+
+                    cohort.setNewCandidate(candidate);
+                    tip = candidate;
+                } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+                    LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+                    cohort.reportFailure(e);
+                }
+            }
+        }
     }
 
     void setRunOnPendingTransactionsComplete(final Runnable operation) {
index 4df9dea..6cb9bad 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
+@VisibleForTesting
 public abstract class ShardDataTreeCohort implements Identifiable<TransactionIdentifier> {
     public enum State {
         READY,
index 0527d01..9791a59 100644 (file)
@@ -13,8 +13,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -28,7 +26,7 @@ import scala.concurrent.Future;
 
 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
-    private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+
     private final DataTreeModification transaction;
     private final ShardDataTree dataTree;
     private final TransactionIdentifier transactionId;
@@ -58,7 +56,6 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     }
 
     @Override
-
     DataTreeModification getDataTreeModification() {
         return transaction;
     }
@@ -92,21 +89,25 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         }
     }
 
-
     @Override
-    public void abort(final FutureCallback<Void> callback) {
-        dataTree.startAbort(this);
+    public void abort(final FutureCallback<Void> abortCallback) {
+        if (!dataTree.startAbort(this)) {
+            abortCallback.onSuccess(null);
+            return;
+        }
+
+        candidate = null;
         state = State.ABORTED;
 
         final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
         if (!maybeAborts.isPresent()) {
-            callback.onSuccess(null);
+            abortCallback.onSuccess(null);
             return;
         }
 
         final Future<Iterable<Object>> aborts = maybeAborts.get();
         if (aborts.isCompleted()) {
-            callback.onSuccess(null);
+            abortCallback.onSuccess(null);
             return;
         }
 
@@ -114,9 +115,9 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
             @Override
             public void onComplete(final Throwable failure, final Iterable<Object> objs) {
                 if (failure != null) {
-                    callback.onFailure(failure);
+                    abortCallback.onFailure(failure);
                 } else {
-                    callback.onSuccess(null);
+                    abortCallback.onSuccess(null);
                 }
             }
         }, ExecutionContexts.global());
@@ -127,7 +128,12 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         checkState(State.PRE_COMMIT_COMPLETE);
         this.callback = Preconditions.checkNotNull(newCallback);
         state = State.COMMIT_PENDING;
-        dataTree.startCommit(this, candidate);
+
+        if (nextFailure == null) {
+            dataTree.startCommit(this, candidate);
+        } else {
+            failedCommit(nextFailure);
+        }
     }
 
     private <T> FutureCallback<T> switchState(final State newState) {
@@ -139,6 +145,11 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         return ret;
     }
 
+    void setNewCandidate(DataTreeCandidateTip dataTreeCandidate) {
+        checkState(State.PRE_COMMIT_COMPLETE);
+        this.candidate = Verify.verifyNotNull(dataTreeCandidate);
+    }
+
     void successfulCanCommit() {
         switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
     }
index 5a096ac..c382547 100644 (file)
@@ -42,7 +42,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
@@ -122,7 +121,7 @@ public class ShardDataTreeTest extends AbstractTest {
     public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException {
         final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
 
-        final List<DataTreeCandidateTip> candidates = new ArrayList<>();
+        final List<DataTreeCandidate> candidates = new ArrayList<>();
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
 
@@ -139,7 +138,7 @@ public class ShardDataTreeTest extends AbstractTest {
     public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException {
         final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
 
-        final List<DataTreeCandidateTip> candidates = new ArrayList<>();
+        final List<DataTreeCandidate> candidates = new ArrayList<>();
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
         candidates.add(addCar(shardDataTree));
@@ -197,8 +196,8 @@ public class ShardDataTreeTest extends AbstractTest {
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    private static void verifyOnDataTreeChanged(DOMDataTreeChangeListener listener,
-            Consumer<DataTreeCandidate> callback) {
+    private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
+            final Consumer<DataTreeCandidate> callback) {
         ArgumentCaptor<Collection> changes = ArgumentCaptor.forClass(Collection.class);
         verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
         for (Collection list : changes.getAllValues()) {
@@ -222,12 +221,12 @@ public class ShardDataTreeTest extends AbstractTest {
         return optional.get();
     }
 
-    private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree)
+    private static DataTreeCandidate addCar(final ShardDataTree shardDataTree)
             throws ExecutionException, InterruptedException {
         return addCar(shardDataTree, "altima");
     }
 
-    private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree, String name)
+    private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name)
             throws ExecutionException, InterruptedException {
         return doTransaction(shardDataTree, snapshot -> {
             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
@@ -236,7 +235,7 @@ public class ShardDataTreeTest extends AbstractTest {
         });
     }
 
-    private static DataTreeCandidateTip removeCar(final ShardDataTree shardDataTree)
+    private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree)
             throws ExecutionException, InterruptedException {
         return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
     }
@@ -246,7 +245,7 @@ public class ShardDataTreeTest extends AbstractTest {
         void execute(DataTreeModification snapshot);
     }
 
-    private static DataTreeCandidateTip doTransaction(final ShardDataTree shardDataTree,
+    private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
             final DataTreeOperation operation) throws ExecutionException, InterruptedException {
         final ReadWriteShardDataTreeTransaction transaction =
                 shardDataTree.newReadWriteTransaction(nextTransactionId());
@@ -256,25 +255,25 @@ public class ShardDataTreeTest extends AbstractTest {
 
         immediateCanCommit(cohort);
         immediatePreCommit(cohort);
-        final DataTreeCandidateTip candidate = cohort.getCandidate();
+        final DataTreeCandidate candidate = cohort.getCandidate();
         immediateCommit(cohort);
 
         return candidate;
     }
 
-    private static DataTreeCandidateTip applyCandidates(final ShardDataTree shardDataTree,
-            final List<DataTreeCandidateTip> candidates) throws ExecutionException, InterruptedException {
+    private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
+            final List<DataTreeCandidate> candidates) throws ExecutionException, InterruptedException {
         final ReadWriteShardDataTreeTransaction transaction =
                 shardDataTree.newReadWriteTransaction(nextTransactionId());
         final DataTreeModification snapshot = transaction.getSnapshot();
-        for (final DataTreeCandidateTip candidateTip : candidates) {
+        for (final DataTreeCandidate candidateTip : candidates) {
             DataTreeCandidates.applyToModification(snapshot, candidateTip);
         }
         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
 
         immediateCanCommit(cohort);
         immediatePreCommit(cohort);
-        final DataTreeCandidateTip candidate = cohort.getCandidate();
+        final DataTreeCandidate candidate = cohort.getCandidate();
         immediateCommit(cohort);
 
         return candidate;
index 0a392bf..f6f6c26 100644 (file)
@@ -1345,8 +1345,12 @@ public class ShardTest extends AbstractShardTest {
                 final InOrder inOrder = inOrder(dataTree);
                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+
+                // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
+                //        validate performs wrapping and we capture that mock
+                // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+
                 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
-                inOrder.verify(dataTree).validate(any(DataTreeModification.class));
             }
         };
     }
index 7d2500b..5fef104 100644 (file)
@@ -230,7 +230,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
 
     @Test
     public void testAbort() throws Exception {
-        doNothing().when(mockShardDataTree).startAbort(cohort);
+        doReturn(true).when(mockShardDataTree).startAbort(cohort);
 
         abort(cohort).get();
         verify(mockShardDataTree).startAbort(cohort);
@@ -238,7 +238,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
 
     @Test
     public void testAbortWithCohorts() throws Exception {
-        doNothing().when(mockShardDataTree).startAbort(cohort);
+        doReturn(true).when(mockShardDataTree).startAbort(cohort);
 
         final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
         doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.