X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=064f6f5d8a476c6451ff9b4bd9bce0c4344549e0;hp=f18271ae36671c342d7b5172a9e40c6b6c662d2b;hb=225ff4000ac10d6dbdc2301d8d2165d282721413;hpb=a81d98f692b80c45bce3fe6a87e731abfb012a9f diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index f18271ae36..064f6f5d8a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -10,13 +10,16 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; +import com.google.common.collect.Iterables; import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; @@ -63,9 +66,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -99,6 +104,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Map transactionChains = new HashMap<>(); private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); private final Queue pendingTransactions = new ArrayDeque<>(); + private final Queue pendingCommits = new ArrayDeque<>(); + private final Queue pendingFinishCommits = new ArrayDeque<>(); private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final Collection> metadata; @@ -107,6 +114,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Shard shard; private Runnable runOnPendingTransactionsComplete; + /** + * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a + * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another + * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current + * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself. + */ + private TipProducingDataTreeTip tip; + private SchemaContext schemaContext; public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, @@ -121,25 +136,32 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher); this.logContext = Preconditions.checkNotNull(logContext); this.metadata = ImmutableList.copyOf(metadata); + tip = dataTree; } public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, + final YangInstanceIdentifier root, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) { - this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType), + this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root), treeChangeListenerPublisher, dataChangeListenerPublisher, logContext); } @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { - this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(), + this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, + new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), ""); } - String logContext() { + final String logContext() { return logContext; } + final Ticker ticker() { + return shard.ticker(); + } + public TipProducingDataTree getDataTree() { return dataTree; } @@ -173,11 +195,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build()); } + private boolean anyPendingTransactions() { + return !pendingTransactions.isEmpty() || !pendingCommits.isEmpty() || !pendingFinishCommits.isEmpty(); + } + private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot, final UnaryOperator wrapper) throws DataValidationFailedException { final Stopwatch elapsed = Stopwatch.createStarted(); - if (!pendingTransactions.isEmpty()) { + if (anyPendingTransactions()) { LOG.warn("{}: applying state snapshot with pending transactions", logContext); } @@ -347,7 +373,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } private void payloadReplicationComplete(final TransactionIdentifier txId) { - final CommitEntry current = pendingTransactions.peek(); + final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); return; @@ -463,7 +489,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } int getQueueSize() { - return pendingTransactions.size(); + return pendingTransactions.size() + pendingCommits.size() + pendingFinishCommits.size(); } @Override @@ -500,6 +526,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting @Deprecated public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException { + // Direct modification commit is a utility, which cannot be used while we have transactions in-flight + Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending"); + modification.ready(); dataTree.validate(modification); DataTreeCandidate candidate = dataTree.prepare(modification); @@ -508,22 +537,41 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } public Collection getAndClearPendingTransactions() { - Collection ret = new ArrayList<>(pendingTransactions.size()); + Collection ret = new ArrayList<>(pendingTransactions.size() + pendingCommits.size() + + pendingFinishCommits.size()); + + for (CommitEntry entry: pendingFinishCommits) { + ret.add(entry.cohort); + } + + for (CommitEntry entry: pendingCommits) { + ret.add(entry.cohort); + } + for (CommitEntry entry: pendingTransactions) { ret.add(entry.cohort); } + pendingFinishCommits.clear(); + pendingCommits.clear(); pendingTransactions.clear(); + tip = dataTree; return ret; } @SuppressWarnings("checkstyle:IllegalCatch") - private void processNextTransaction() { + private void processNextPendingTransaction() { while (!pendingTransactions.isEmpty()) { final CommitEntry entry = pendingTransactions.peek(); final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); + if (cohort.isFailed()) { + LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier()); + pendingTransactions.remove(); + continue; + } + if (cohort.getState() != State.CAN_COMMIT_PENDING) { break; } @@ -531,7 +579,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { - dataTree.validate(modification); + tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); entry.lastAccess = shard.ticker().read(); @@ -561,6 +609,32 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { maybeRunOperationOnPendingTransactionsComplete(); } + private void processNextPendingCommit() { + while (!pendingCommits.isEmpty()) { + final CommitEntry entry = pendingCommits.peek(); + final SimpleShardDataTreeCohort cohort = entry.cohort; + + if (cohort.isFailed()) { + LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier()); + pendingCommits.remove(); + continue; + } + + if (cohort.getState() == State.COMMIT_PENDING) { + startCommit(cohort, cohort.getCandidate()); + } + + break; + } + + maybeRunOperationOnPendingTransactionsComplete(); + } + + private void processNextPending() { + processNextPendingCommit(); + processNextPendingTransaction(); + } + void startCanCommit(final SimpleShardDataTreeCohort cohort) { final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort; if (!cohort.equals(current)) { @@ -568,13 +642,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } - processNextTransaction(); + processNextPendingTransaction(); } private void failPreCommit(final Exception cause) { shard.getShardMBean().incrementFailedTransactionsCount(); pendingTransactions.poll().cohort.failedPreCommit(cause); - processNextTransaction(); + processNextPendingTransaction(); } @SuppressWarnings("checkstyle:IllegalCatch") @@ -586,7 +660,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); final DataTreeCandidateTip candidate; try { - candidate = dataTree.prepare(cohort.getDataTreeModification()); + candidate = tip.prepare(cohort.getDataTreeModification()); } catch (Exception e) { failPreCommit(e); return; @@ -599,14 +673,22 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } + // Set the tip of the data tree. + tip = Verify.verifyNotNull(candidate); + entry.lastAccess = shard.ticker().read(); + + pendingTransactions.remove(); + pendingCommits.add(entry); cohort.successfulPreCommit(candidate); + + processNextPendingTransaction(); } private void failCommit(final Exception cause) { shard.getShardMBean().incrementFailedTransactionsCount(); - pendingTransactions.poll().cohort.failedCommit(cause); - processNextTransaction(); + pendingFinishCommits.poll().cohort.failedCommit(cause); + processNextPending(); } @SuppressWarnings("checkstyle:IllegalCatch") @@ -616,6 +698,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Resuming commit of transaction {}", logContext, txId); + if (tip == candidate) { + // All pending candidates have been committed, reset the tip to the data tree. + tip = dataTree; + } + try { dataTree.commit(candidate); } catch (Exception e) { @@ -628,23 +715,28 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); // FIXME: propagate journal index - pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO); + pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO); LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); notifyListeners(candidate); - processNextTransaction(); + processNextPending(); } void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { - final CommitEntry entry = pendingTransactions.peek(); + final CommitEntry entry = pendingCommits.peek(); Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort); final SimpleShardDataTreeCohort current = entry.cohort; - Verify.verify(cohort.equals(current), "Attempted to commit %s while %s is pending", cohort, current); + if (!cohort.equals(current)) { + LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.getIdentifier()); + return; + } if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) { LOG.debug("{}: No replication required, proceeding to finish commit", logContext); + pendingCommits.remove(); + pendingFinishCommits.add(entry); finishCommit(cohort); return; } @@ -653,16 +745,22 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final Payload payload; try { payload = CommitTransactionPayload.create(txId, candidate); + + // Once completed, we will continue via payloadReplicationComplete + entry.lastAccess = shard.ticker().read(); + shard.persistPayload(txId, payload); + + LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId); + + pendingCommits.remove(); + pendingFinishCommits.add(entry); } catch (IOException e) { LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e); - pendingTransactions.poll().cohort.failedCommit(e); + pendingCommits.poll().cohort.failedCommit(e); return; } - // Once completed, we will continue via payloadReplicationComplete - entry.lastAccess = shard.ticker().read(); - shard.persistPayload(txId, payload); - LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId); + processNextPending(); } void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { @@ -678,28 +776,30 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return cohort; } - @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED", "DB_DUPLICATE_SWITCH_CLAUSES"}, - justification = "See inline comments below.") + @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = shard.ticker().read(); - final CommitEntry currentTx = pendingTransactions.peek(); + + final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : + !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; + final CommitEntry currentTx = currentQueue.peek(); if (currentTx != null && currentTx.lastAccess + timeout < now) { LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState()); boolean processNext = true; switch (currentTx.cohort.getState()) { case CAN_COMMIT_PENDING: - pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException()); + currentQueue.remove().cohort.failedCanCommit(new TimeoutException()); break; case CAN_COMMIT_COMPLETE: // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code // in PRE_COMMIT_COMPLETE is changed. - pendingTransactions.poll().cohort.reportFailure(new TimeoutException()); + currentQueue.remove().cohort.reportFailure(new TimeoutException()); break; case PRE_COMMIT_PENDING: - pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException()); + currentQueue.remove().cohort.failedPreCommit(new TimeoutException()); break; case PRE_COMMIT_COMPLETE: // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we @@ -719,7 +819,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // In order to make the pre-commit timer working across failovers, though, we need // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately // restart the timer. - pendingTransactions.poll().cohort.reportFailure(new TimeoutException()); + currentQueue.remove().cohort.reportFailure(new TimeoutException()); break; case COMMIT_PENDING: LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, @@ -732,55 +832,94 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { case FAILED: case READY: default: - // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In - // this case, we just want to drop the current entry that expired and thus ignore the return value. - // In fact we really shouldn't hit this case but we handle all enums for completeness. - pendingTransactions.poll(); + currentQueue.remove(); } if (processNext) { - processNextTransaction(); + processNextPending(); } } } - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "See inline comment below.") - void startAbort(final SimpleShardDataTreeCohort cohort) { - final Iterator it = pendingTransactions.iterator(); + boolean startAbort(final SimpleShardDataTreeCohort cohort) { + final Iterator it = Iterables.concat(pendingFinishCommits, pendingCommits, + pendingTransactions).iterator(); if (!it.hasNext()) { LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier()); - return; + return true; } // First entry is special, as it may already be committing final CommitEntry first = it.next(); if (cohort.equals(first.cohort)) { if (cohort.getState() != State.COMMIT_PENDING) { - LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(), + LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(), cohort.getIdentifier()); - // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In - // this case, we've already obtained the head of the queue above via the Iterator and we just want to - // remove it here. - pendingTransactions.poll(); - processNextTransaction(); - } else { - LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier()); + it.remove(); + if (cohort.getCandidate() != null) { + rebaseTransactions(it, dataTree); + } + + processNextPending(); + return true; } - return; + LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier()); + return false; } + TipProducingDataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier()); + it.remove(); - return; + if (cohort.getCandidate() != null) { + rebaseTransactions(it, newTip); + } + + return true; + } else { + newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), dataTree); } } LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier()); + return true; + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void rebaseTransactions(Iterator iter, @Nonnull TipProducingDataTreeTip newTip) { + tip = Preconditions.checkNotNull(newTip); + while (iter.hasNext()) { + final SimpleShardDataTreeCohort cohort = iter.next().cohort; + if (cohort.getState() == State.CAN_COMMIT_COMPLETE) { + LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier()); + + try { + tip.validate(cohort.getDataTreeModification()); + } catch (DataValidationFailedException | RuntimeException e) { + LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e); + cohort.reportFailure(e); + } + } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) { + LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier()); + + try { + tip.validate(cohort.getDataTreeModification()); + DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification()); + cohort.userPreCommit(candidate); + + cohort.setNewCandidate(candidate); + tip = candidate; + } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) { + LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e); + cohort.reportFailure(e); + } + } + } } void setRunOnPendingTransactionsComplete(final Runnable operation) { @@ -789,7 +928,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } private void maybeRunOperationOnPendingTransactionsComplete() { - if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) { + if (runOnPendingTransactionsComplete != null && !anyPendingTransactions()) { LOG.debug("{}: Pending transactions complete - running operation {}", logContext, runOnPendingTransactionsComplete);