From a0332590d14ab7aad0247ae12bff4205c90cac94 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sat, 24 Oct 2015 18:34:53 +0200 Subject: [PATCH] BUG-7033: Allow ShardDataTree to pipeline transactions InMemoryDataTree gives us the DataTreeTip, which allows another DataTreeModificatoin to be prepared on top an uncommitted one. This allows us to pipeline transactions if we manage our lifecycle properly. For now this feature gives stricter abort validation, and has otherwise no impact. In future, this allows another DataTreeModification to be prepared and queue for persistence before replication of the previous one finishes. Change-Id: I7ab97c906a6403da780800edd335f74c403e5aa4 Signed-off-by: Robert Varga Signed-off-by: Tom Pantelis --- .../cluster/datastore/ShardDataTree.java | 81 ++++++++++++++++--- .../datastore/ShardDataTreeCohort.java | 1 + .../datastore/SimpleShardDataTreeCohort.java | 35 +++++--- .../cluster/datastore/ShardDataTreeTest.java | 27 +++---- .../cluster/datastore/ShardTest.java | 6 +- .../SimpleShardDataTreeCohortTest.java | 4 +- 6 files changed, 115 insertions(+), 39 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index bc9549a640..a04bf62a96 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -64,9 +64,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -108,6 +110,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Shard shard; private Runnable runOnPendingTransactionsComplete; + /** + * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a + * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another + * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current + * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself. + */ + private TipProducingDataTreeTip tip; + private SchemaContext schemaContext; public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, @@ -122,6 +132,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher); this.logContext = Preconditions.checkNotNull(logContext); this.metadata = ImmutableList.copyOf(metadata); + tip = dataTree; } public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, @@ -505,6 +516,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting @Deprecated public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException { + // Direct modification commit is a utility, which cannot be used while we have transactions in-flight + Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending"); + modification.ready(); dataTree.validate(modification); DataTreeCandidate candidate = dataTree.prepare(modification); @@ -519,6 +533,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } pendingTransactions.clear(); + tip = dataTree; return ret; } @@ -536,7 +551,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { - dataTree.validate(modification); + tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); entry.lastAccess = shard.ticker().read(); @@ -591,7 +606,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); final DataTreeCandidateTip candidate; try { - candidate = dataTree.prepare(cohort.getDataTreeModification()); + candidate = tip.prepare(cohort.getDataTreeModification()); } catch (Exception e) { failPreCommit(e); return; @@ -604,6 +619,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } + // Set the tip of the data tree. + tip = Verify.verifyNotNull(candidate); + entry.lastAccess = shard.ticker().read(); cohort.successfulPreCommit(candidate); } @@ -629,6 +647,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } + // All pending candidates have been committed, reset the tip to the data tree + if (tip == candidate) { + tip = dataTree; + } + shard.getShardMBean().incrementCommittedTransactionCount(); shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -745,39 +768,77 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - void startAbort(final SimpleShardDataTreeCohort cohort) { + boolean startAbort(final SimpleShardDataTreeCohort cohort) { final Iterator it = pendingTransactions.iterator(); if (!it.hasNext()) { LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier()); - return; + return true; } // First entry is special, as it may already be committing final CommitEntry first = it.next(); if (cohort.equals(first.cohort)) { if (cohort.getState() != State.COMMIT_PENDING) { - LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(), + LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(), cohort.getIdentifier()); - pendingTransactions.remove(); + it.remove(); + rebasePreCommittedTransactions(it, dataTree); processNextTransaction(); - } else { - LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier()); + return true; } - return; + LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier()); + return false; } + TipProducingDataTreeTip newTip = dataTree; while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier()); it.remove(); - return; + rebasePreCommittedTransactions(it, newTip); + return true; + } else { + newTip = cohort.getCandidate(); } } LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier()); + return true; + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void rebasePreCommittedTransactions(Iterator iter, TipProducingDataTreeTip newTip) { + tip = newTip; + while (iter.hasNext()) { + final SimpleShardDataTreeCohort cohort = iter.next().cohort; + if (cohort.getState() == State.CAN_COMMIT_COMPLETE) { + LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier()); + + try { + tip.validate(cohort.getDataTreeModification()); + } catch (DataValidationFailedException | RuntimeException e) { + LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e); + cohort.reportFailure(e); + } + } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) { + LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier()); + + try { + tip.validate(cohort.getDataTreeModification()); + DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification()); + cohort.userPreCommit(candidate); + + cohort.setNewCandidate(candidate); + tip = candidate; + } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) { + LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e); + cohort.reportFailure(e); + } + } + } } void setRunOnPendingTransactionsComplete(final Runnable operation) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index 4df9dea7db..6cb9badd8d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -16,6 +16,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +@VisibleForTesting public abstract class ShardDataTreeCohort implements Identifiable { public enum State { READY, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java index 0527d013f2..9791a59611 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -13,8 +13,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -28,7 +26,7 @@ import scala.concurrent.Future; final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class); - private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); + private final DataTreeModification transaction; private final ShardDataTree dataTree; private final TransactionIdentifier transactionId; @@ -58,7 +56,6 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { } @Override - DataTreeModification getDataTreeModification() { return transaction; } @@ -92,21 +89,25 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { } } - @Override - public void abort(final FutureCallback callback) { - dataTree.startAbort(this); + public void abort(final FutureCallback abortCallback) { + if (!dataTree.startAbort(this)) { + abortCallback.onSuccess(null); + return; + } + + candidate = null; state = State.ABORTED; final Optional>> maybeAborts = userCohorts.abort(); if (!maybeAborts.isPresent()) { - callback.onSuccess(null); + abortCallback.onSuccess(null); return; } final Future> aborts = maybeAborts.get(); if (aborts.isCompleted()) { - callback.onSuccess(null); + abortCallback.onSuccess(null); return; } @@ -114,9 +115,9 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { @Override public void onComplete(final Throwable failure, final Iterable objs) { if (failure != null) { - callback.onFailure(failure); + abortCallback.onFailure(failure); } else { - callback.onSuccess(null); + abortCallback.onSuccess(null); } } }, ExecutionContexts.global()); @@ -127,7 +128,12 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { checkState(State.PRE_COMMIT_COMPLETE); this.callback = Preconditions.checkNotNull(newCallback); state = State.COMMIT_PENDING; - dataTree.startCommit(this, candidate); + + if (nextFailure == null) { + dataTree.startCommit(this, candidate); + } else { + failedCommit(nextFailure); + } } private FutureCallback switchState(final State newState) { @@ -139,6 +145,11 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { return ret; } + void setNewCandidate(DataTreeCandidateTip dataTreeCandidate) { + checkState(State.PRE_COMMIT_COMPLETE); + this.candidate = Verify.verifyNotNull(dataTreeCandidate); + } + void successfulCanCommit() { switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java index 5a096acbb1..c382547690 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java @@ -42,7 +42,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; @@ -122,7 +121,7 @@ public class ShardDataTreeTest extends AbstractTest { public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException { final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL); - final List candidates = new ArrayList<>(); + final List candidates = new ArrayList<>(); candidates.add(addCar(shardDataTree)); candidates.add(removeCar(shardDataTree)); @@ -139,7 +138,7 @@ public class ShardDataTreeTest extends AbstractTest { public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException { final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL); - final List candidates = new ArrayList<>(); + final List candidates = new ArrayList<>(); candidates.add(addCar(shardDataTree)); candidates.add(removeCar(shardDataTree)); candidates.add(addCar(shardDataTree)); @@ -197,8 +196,8 @@ public class ShardDataTreeTest extends AbstractTest { } @SuppressWarnings({ "rawtypes", "unchecked" }) - private static void verifyOnDataTreeChanged(DOMDataTreeChangeListener listener, - Consumer callback) { + private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener, + final Consumer callback) { ArgumentCaptor changes = ArgumentCaptor.forClass(Collection.class); verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture()); for (Collection list : changes.getAllValues()) { @@ -222,12 +221,12 @@ public class ShardDataTreeTest extends AbstractTest { return optional.get(); } - private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree) + private static DataTreeCandidate addCar(final ShardDataTree shardDataTree) throws ExecutionException, InterruptedException { return addCar(shardDataTree, "altima"); } - private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree, String name) + private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name) throws ExecutionException, InterruptedException { return doTransaction(shardDataTree, snapshot -> { snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); @@ -236,7 +235,7 @@ public class ShardDataTreeTest extends AbstractTest { }); } - private static DataTreeCandidateTip removeCar(final ShardDataTree shardDataTree) + private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree) throws ExecutionException, InterruptedException { return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima"))); } @@ -246,7 +245,7 @@ public class ShardDataTreeTest extends AbstractTest { void execute(DataTreeModification snapshot); } - private static DataTreeCandidateTip doTransaction(final ShardDataTree shardDataTree, + private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree, final DataTreeOperation operation) throws ExecutionException, InterruptedException { final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId()); @@ -256,25 +255,25 @@ public class ShardDataTreeTest extends AbstractTest { immediateCanCommit(cohort); immediatePreCommit(cohort); - final DataTreeCandidateTip candidate = cohort.getCandidate(); + final DataTreeCandidate candidate = cohort.getCandidate(); immediateCommit(cohort); return candidate; } - private static DataTreeCandidateTip applyCandidates(final ShardDataTree shardDataTree, - final List candidates) throws ExecutionException, InterruptedException { + private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree, + final List candidates) throws ExecutionException, InterruptedException { final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId()); final DataTreeModification snapshot = transaction.getSnapshot(); - for (final DataTreeCandidateTip candidateTip : candidates) { + for (final DataTreeCandidate candidateTip : candidates) { DataTreeCandidates.applyToModification(snapshot, candidateTip); } final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction); immediateCanCommit(cohort); immediatePreCommit(cohort); - final DataTreeCandidateTip candidate = cohort.getCandidate(); + final DataTreeCandidate candidate = cohort.getCandidate(); immediateCommit(cohort); return candidate; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 0a392bfd00..f6f6c26c1e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -1345,8 +1345,12 @@ public class ShardTest extends AbstractShardTest { final InOrder inOrder = inOrder(dataTree); inOrder.verify(dataTree).validate(any(DataTreeModification.class)); inOrder.verify(dataTree).prepare(any(DataTreeModification.class)); + + // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock + // validate performs wrapping and we capture that mock + // inOrder.verify(dataTree).validate(any(DataTreeModification.class)); + inOrder.verify(dataTree).commit(any(DataTreeCandidate.class)); - inOrder.verify(dataTree).validate(any(DataTreeModification.class)); } }; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java index 7d2500b716..5fef104eaa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java @@ -230,7 +230,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest { @Test public void testAbort() throws Exception { - doNothing().when(mockShardDataTree).startAbort(cohort); + doReturn(true).when(mockShardDataTree).startAbort(cohort); abort(cohort).get(); verify(mockShardDataTree).startAbort(cohort); @@ -238,7 +238,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest { @Test public void testAbortWithCohorts() throws Exception { - doNothing().when(mockShardDataTree).startAbort(cohort); + doReturn(true).when(mockShardDataTree).startAbort(cohort); final Promise> cohortFuture = akka.dispatch.Futures.promise(); doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort(); -- 2.36.6