X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=e8b41d97474f49951b8b67846d22c7a06b73d185;hb=52725324973f22ac0c85ed4fd8459cf0ef504407;hp=e399cd49ab3d79427d10c8f8a723a2164c27e1d2;hpb=8232a626b43fdd2f5799da0fbcfb0f02d3c8f4fb;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index e399cd49ab..e8b41d9747 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Iterables; import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; @@ -31,7 +32,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -60,23 +60,23 @@ import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModifi import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -129,24 +129,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final Collection> metadata; - private final TipProducingDataTree dataTree; + private final DataTree dataTree; private final String logContext; private final Shard shard; private Runnable runOnPendingTransactionsComplete; /** * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a - * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another + * {@link DataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself. */ - private TipProducingDataTreeTip tip; + private DataTreeTip tip; private SchemaContext schemaContext; private int currentTransactionBatch; - ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { @@ -166,8 +166,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { - this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root), - treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata); + this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher, + dataChangeListenerPublisher, logContext, metadata); + } + + private static DataTree createDataTree(final TreeType treeType, final YangInstanceIdentifier root) { + final DataTreeConfiguration baseConfig = DataTreeConfiguration.getDefault(treeType); + return new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType()) + .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled()) + .setUniqueIndexes(baseConfig.isUniqueIndexEnabled()) + .setRootPath(root) + .build()); } @VisibleForTesting @@ -185,7 +194,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return shard.ticker().read(); } - public TipProducingDataTree getDataTree() { + public DataTree getDataTree() { return dataTree; } @@ -619,7 +628,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } Optional readCurrentData() { - final Optional> currentState = + final java.util.Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); @@ -662,7 +671,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } public Optional> readNode(final YangInstanceIdentifier path) { - return dataTree.takeSnapshot().readNode(path); + return Optional.fromJavaUtil(dataTree.takeSnapshot().readNode(path)); } DataTreeSnapshot takeSnapshot() { @@ -728,7 +737,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } catch (ConflictingModificationAppliedException e) { LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), e.getPath()); - cause = new OptimisticLockFailedException("Optimistic lock failed.", e); + cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e); } catch (DataValidationFailedException e) { LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(), e.getPath(), e); @@ -737,7 +746,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // precondition log, it should allow us to understand what went on. LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree); - cause = new TransactionCommitFailedException("Data did not pass validation.", e); + cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e); } catch (Exception e) { LOG.warn("{}: Unexpected failure in validation phase", logContext, e); cause = e; @@ -799,7 +808,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingTransaction(); } - private void failPreCommit(final Exception cause) { + private void failPreCommit(final Throwable cause) { shard.getShardMBean().incrementFailedTransactionsCount(); pendingTransactions.poll().cohort.failedPreCommit(cause); processNextPendingTransaction(); @@ -818,25 +827,34 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeCandidateTip candidate; try { candidate = tip.prepare(cohort.getDataTreeModification()); - cohort.userPreCommit(candidate); - } catch (ExecutionException | TimeoutException | RuntimeException e) { + } catch (RuntimeException e) { failPreCommit(e); return; } - // Set the tip of the data tree. - tip = Verify.verifyNotNull(candidate); + cohort.userPreCommit(candidate, new FutureCallback() { + @Override + public void onSuccess(final Void noop) { + // Set the tip of the data tree. + tip = Verify.verifyNotNull(candidate); - entry.lastAccess = readTime(); + entry.lastAccess = readTime(); - pendingTransactions.remove(); - pendingCommits.add(entry); + pendingTransactions.remove(); + pendingCommits.add(entry); - LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); + LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); - cohort.successfulPreCommit(candidate); + cohort.successfulPreCommit(candidate); - processNextPendingTransaction(); + processNextPendingTransaction(); + } + + @Override + public void onFailure(final Throwable failure) { + failPreCommit(failure); + } + }); } private void failCommit(final Exception cause) { @@ -869,12 +887,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); // FIXME: propagate journal index - pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO); - - LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); - notifyListeners(candidate); + pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> { + LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); + notifyListeners(candidate); - processNextPending(); + processNextPending(); + }); } void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { @@ -949,7 +967,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, - cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); + cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable), + COMMIT_STEP_TIMEOUT)); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } @@ -1092,7 +1111,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return false; } - TipProducingDataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); + DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { @@ -1114,7 +1133,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void rebaseTransactions(final Iterator iter, @Nonnull final TipProducingDataTreeTip newTip) { + private void rebaseTransactions(final Iterator iter, @Nonnull final DataTreeTip newTip) { tip = Preconditions.checkNotNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort; @@ -1133,11 +1152,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { try { tip.validate(cohort.getDataTreeModification()); DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification()); - cohort.userPreCommit(candidate); cohort.setNewCandidate(candidate); tip = candidate; - } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) { + } catch (RuntimeException | DataValidationFailedException e) { LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e); cohort.reportFailure(e); }