X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=27a3359910480986909d2d4b85de1b0f5cfa25ef;hp=7aecda48db6ceaf7ca3a5dbc5ae266847b6676ca;hb=4a97740e7fe14f99dc6f6f2b07e44f4123103ce0;hpb=64bc1360aedb83583edb354444ee3e4295c7a5e6 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 7aecda48db..27a3359910 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -20,21 +20,25 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Iterables; import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; -import java.util.concurrent.ExecutionException; +import java.util.SortedSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -57,25 +61,23 @@ import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnap import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -101,6 +103,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.cohort = Preconditions.checkNotNull(cohort); lastAccess = now; } + + @Override + public String toString() { + return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]"; + } } private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); @@ -116,7 +123,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Map transactionChains = new HashMap<>(); private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); - private final Queue pendingTransactions = new ArrayDeque<>(); + private final Deque pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); private final Queue pendingFinishCommits = new ArrayDeque<>(); @@ -126,35 +133,33 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Map replicationCallbacks = new HashMap<>(); private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; - private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final Collection> metadata; - private final TipProducingDataTree dataTree; + private final DataTree dataTree; private final String logContext; private final Shard shard; private Runnable runOnPendingTransactionsComplete; /** * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a - * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another + * {@link DataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself. */ - private TipProducingDataTreeTip tip; + private DataTreeTip tip; private SchemaContext schemaContext; private int currentTransactionBatch; - ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, - final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, + final String logContext, final ShardDataTreeMetadata... metadata) { this.dataTree = Preconditions.checkNotNull(dataTree); updateSchemaContext(schemaContext); this.shard = Preconditions.checkNotNull(shard); this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher); - this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher); this.logContext = Preconditions.checkNotNull(logContext); this.metadata = ImmutableList.copyOf(metadata); tip = dataTree; @@ -163,17 +168,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, final YangInstanceIdentifier root, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, - final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, + final String logContext, final ShardDataTreeMetadata... metadata) { - this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root), - treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata); + this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher, logContext, metadata); + } + + private static DataTree createDataTree(final TreeType treeType, final YangInstanceIdentifier root) { + final DataTreeConfiguration baseConfig = DataTreeConfiguration.getDefault(treeType); + return new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType()) + .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled()) + .setUniqueIndexes(baseConfig.isUniqueIndexEnabled()) + .setRootPath(root) + .build()); } @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(""), - new DefaultShardDataChangeListenerPublisher(""), ""); + new DefaultShardDataTreeChangeListenerPublisher(""), ""); } final String logContext() { @@ -184,7 +196,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return shard.ticker().read(); } - public TipProducingDataTree getDataTree() { + public DataTree getDataTree() { return dataTree; } @@ -298,12 +310,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + void applyRecoverySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { applySnapshot(snapshot, this::wrapWithPruning); } @SuppressWarnings("checkstyle:IllegalCatch") - private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException { + private void applyRecoveryCandidate(final DataTreeCandidate candidate) { final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification()); DataTreeCandidates.applyToModification(mod, candidate); mod.ready(); @@ -332,7 +344,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws IOException when the snapshot fails to deserialize * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException { + void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException { if (payload instanceof CommitTransactionPayload) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); @@ -353,7 +365,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign) + private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); @@ -366,6 +378,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeCandidate candidate = dataTree.prepare(mod); dataTree.commit(candidate); + allMetadataCommittedTransaction(identifier); notifyListeners(candidate); } @@ -392,18 +405,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * pre-Boron state -- which limits the number of options here. */ if (payload instanceof CommitTransactionPayload) { - final TransactionIdentifier txId; if (identifier == null) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); - txId = e.getKey(); - applyReplicatedCandidate(txId, e.getValue()); + applyReplicatedCandidate(e.getKey(), e.getValue()); } else { Verify.verify(identifier instanceof TransactionIdentifier); - txId = (TransactionIdentifier) identifier; - payloadReplicationComplete(txId); + payloadReplicationComplete((TransactionIdentifier) identifier); } - allMetadataCommittedTransaction(txId); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); @@ -455,12 +464,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); + allMetadataCommittedTransaction(txId); return; } if (!current.cohort.getIdentifier().equals(txId)) { LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); + allMetadataCommittedTransaction(txId); return; } @@ -554,7 +565,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public void notifyListeners(final DataTreeCandidate candidate) { treeChangeListenerPublisher.publishChanges(candidate); - dataChangeListenerPublisher.publishChanges(candidate); } /** @@ -609,16 +619,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } - void registerDataChangeListener(final YangInstanceIdentifier path, - final AsyncDataChangeListener> listener, - final DataChangeScope scope, final Optional initialState, - final Consumer>>> - onRegistration) { - dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration); - } - Optional readCurrentData() { - final Optional> currentState = + final java.util.Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); @@ -648,11 +650,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction, + final java.util.Optional> participatingShardNames) { final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); - return createReadyCohort(transaction.getIdentifier(), snapshot); + return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames); } void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { @@ -661,7 +664,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } public Optional> readNode(final YangInstanceIdentifier path) { - return dataTree.takeSnapshot().readNode(path); + return Optional.fromJavaUtil(dataTree.takeSnapshot().readNode(path)); } DataTreeSnapshot takeSnapshot() { @@ -727,16 +730,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } catch (ConflictingModificationAppliedException e) { LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), e.getPath()); - cause = new OptimisticLockFailedException("Optimistic lock failed.", e); + cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e); } catch (DataValidationFailedException e) { LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(), e.getPath(), e); // For debugging purposes, allow dumping of the modification. Coupled with the above // precondition log, it should allow us to understand what went on. - LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, - dataTree); - cause = new TransactionCommitFailedException("Data did not pass validation.", e); + LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", logContext, cohort.getIdentifier(), + modification, dataTree); + cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e); } catch (Exception e) { LOG.warn("{}: Unexpected failure in validation phase", logContext, e); cause = e; @@ -791,14 +794,109 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } if (!cohort.equals(head.cohort)) { - LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier()); - return; + // The tx isn't at the head of the queue so we can't start canCommit at this point. Here we check if this + // tx should be moved ahead of other tx's in the READY state in the pendingTransactions queue. If this tx + // has other participating shards, it could deadlock with other tx's accessing the same shards + // depending on the order the tx's are readied on each shard + // (see https://jira.opendaylight.org/browse/CONTROLLER-1836). Therefore, if the preceding participating + // shard names for a preceding pending tx, call it A, in the queue matches that of this tx, then this tx + // is allowed to be moved ahead of tx A in the queue so it is processed first to avoid potential deadlock + // if tx A is behind this tx in the pendingTransactions queue for a preceding shard. In other words, since + // canCommmit for this tx was requested before tx A, honor that request. If this tx is moved to the head of + // the queue as a result, then proceed with canCommit. + + Collection precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames()); + if (precedingShardNames.isEmpty()) { + LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier()); + return; + } + + LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}", + logContext, cohort.getIdentifier(), precedingShardNames); + final Iterator iter = pendingTransactions.iterator(); + int index = -1; + int moveToIndex = -1; + while (iter.hasNext()) { + final CommitEntry entry = iter.next(); + ++index; + + if (cohort.equals(entry.cohort)) { + if (moveToIndex < 0) { + LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit", + logContext, cohort.getIdentifier()); + return; + } + + LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue", + logContext, cohort.getIdentifier(), moveToIndex); + iter.remove(); + insertEntry(pendingTransactions, entry, moveToIndex); + + if (!cohort.equals(pendingTransactions.peek().cohort)) { + LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit", + logContext, cohort.getIdentifier()); + return; + } + + LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit", + logContext, cohort.getIdentifier()); + break; + } + + if (entry.cohort.getState() != State.READY) { + LOG.debug("{}: Skipping pending transaction {} in state {}", + logContext, entry.cohort.getIdentifier(), entry.cohort.getState()); + continue; + } + + final Collection pendingPrecedingShardNames = extractPrecedingShardNames( + entry.cohort.getParticipatingShardNames()); + + if (precedingShardNames.equals(pendingPrecedingShardNames)) { + if (moveToIndex < 0) { + LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}", + logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index); + moveToIndex = index; + } else { + LOG.debug( + "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}", + logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex); + } + } else { + LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping", + logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier()); + } + } } processNextPendingTransaction(); } - private void failPreCommit(final Exception cause) { + private void insertEntry(final Deque queue, final CommitEntry entry, final int atIndex) { + if (atIndex == 0) { + queue.addFirst(entry); + return; + } + + LOG.trace("Inserting into Deque at index {}", atIndex); + + Deque tempStack = new ArrayDeque<>(atIndex); + for (int i = 0; i < atIndex; i++) { + tempStack.push(queue.poll()); + } + + queue.addFirst(entry); + + tempStack.forEach(queue::addFirst); + } + + private Collection extractPrecedingShardNames( + final java.util.Optional> participatingShardNames) { + return participatingShardNames.map((Function, Collection>) + set -> set.headSet(shard.getShardName())).orElse(Collections.emptyList()); + } + + private void failPreCommit(final Throwable cause) { shard.getShardMBean().incrementFailedTransactionsCount(); pendingTransactions.poll().cohort.failedPreCommit(cause); processNextPendingTransaction(); @@ -817,25 +915,34 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeCandidateTip candidate; try { candidate = tip.prepare(cohort.getDataTreeModification()); - cohort.userPreCommit(candidate); - } catch (ExecutionException | TimeoutException | RuntimeException e) { + } catch (RuntimeException e) { failPreCommit(e); return; } - // Set the tip of the data tree. - tip = Verify.verifyNotNull(candidate); + cohort.userPreCommit(candidate, new FutureCallback() { + @Override + public void onSuccess(final Void noop) { + // Set the tip of the data tree. + tip = Verify.verifyNotNull(candidate); - entry.lastAccess = readTime(); + entry.lastAccess = readTime(); - pendingTransactions.remove(); - pendingCommits.add(entry); + pendingTransactions.remove(); + pendingCommits.add(entry); - LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); + LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); - cohort.successfulPreCommit(candidate); + cohort.successfulPreCommit(candidate); - processNextPendingTransaction(); + processNextPendingTransaction(); + } + + @Override + public void onFailure(final Throwable failure) { + failPreCommit(failure); + } + }); } private void failCommit(final Exception cause) { @@ -864,16 +971,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } + allMetadataCommittedTransaction(txId); shard.getShardMBean().incrementCommittedTransactionCount(); shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); // FIXME: propagate journal index - pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO); - - LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); - notifyListeners(candidate); + pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> { + LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); + notifyListeners(candidate); - processNextPending(); + processNextPending(); + }); } void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { @@ -946,92 +1054,124 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final java.util.Optional> participatingShardNames) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, - cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); + cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf, + COMMIT_STEP_TIMEOUT), participatingShardNames); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics // the newReadWriteTransaction() - ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final java.util.Optional> participatingShardNames) { if (txId.getHistoryId().getHistoryId() == 0) { - return createReadyCohort(txId, mod); + return createReadyCohort(txId, mod, participatingShardNames); } - return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod); + return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames); } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") - void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { + void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, + final Function> accessTimeUpdater) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = readTime(); final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; final CommitEntry currentTx = currentQueue.peek(); - if (currentTx != null && currentTx.lastAccess + timeout < now) { - final State state = currentTx.cohort.getState(); - LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, - currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, state); - boolean processNext = true; - final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " - + transactionCommitTimeoutMillis + "ms"); - - switch (state) { - case CAN_COMMIT_PENDING: - currentQueue.remove().cohort.failedCanCommit(cohortFailure); - break; - case CAN_COMMIT_COMPLETE: - // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause - // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code - // in PRE_COMMIT_COMPLETE is changed. - currentQueue.remove().cohort.reportFailure(cohortFailure); - break; - case PRE_COMMIT_PENDING: - currentQueue.remove().cohort.failedPreCommit(cohortFailure); - break; - case PRE_COMMIT_COMPLETE: - // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we - // are ready we should commit the transaction, not abort it. Our current software stack does - // not allow us to do that consistently, because we persist at the time of commit, hence - // we can end up in a state where we have pre-committed a transaction, then a leader failover - // occurred ... the new leader does not see the pre-committed transaction and does not have - // a running timer. To fix this we really need two persistence events. - // - // The first one, done at pre-commit time will hold the transaction payload. When consensus - // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not - // apply the state in this event. - // - // The second one, done at commit (or abort) time holds only the transaction identifier and - // signals to followers that the state should (or should not) be applied. - // - // In order to make the pre-commit timer working across failovers, though, we need - // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately - // restart the timer. - currentQueue.remove().cohort.reportFailure(cohortFailure); - break; - case COMMIT_PENDING: - LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, - currentTx.cohort.getIdentifier()); - currentTx.lastAccess = now; - processNext = false; - return; - case READY: - currentQueue.remove().cohort.reportFailure(cohortFailure); - break; - case ABORTED: - case COMMITTED: - case FAILED: - default: - currentQueue.remove(); + if (currentTx == null) { + // Empty queue, no-op + return; + } + + long delta = now - currentTx.lastAccess; + if (delta < timeout) { + // Not expired yet, bail + return; + } + + final Optional updateOpt = accessTimeUpdater.apply(currentTx.cohort); + if (updateOpt.isPresent()) { + final long newAccess = updateOpt.get().longValue(); + final long newDelta = now - newAccess; + if (newDelta < delta) { + LOG.debug("{}: Updated current transaction {} access time", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = newAccess; + delta = newDelta; } - if (processNext) { - processNextPending(); + if (delta < timeout) { + // Not expired yet, bail + return; } } + + final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta); + final State state = currentTx.cohort.getState(); + + LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, + currentTx.cohort.getIdentifier(), deltaMillis, state); + boolean processNext = true; + final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " + + deltaMillis + "ms"); + + switch (state) { + case CAN_COMMIT_PENDING: + currentQueue.remove().cohort.failedCanCommit(cohortFailure); + break; + case CAN_COMMIT_COMPLETE: + // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause + // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code + // in PRE_COMMIT_COMPLETE is changed. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case PRE_COMMIT_PENDING: + currentQueue.remove().cohort.failedPreCommit(cohortFailure); + break; + case PRE_COMMIT_COMPLETE: + // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we + // are ready we should commit the transaction, not abort it. Our current software stack does + // not allow us to do that consistently, because we persist at the time of commit, hence + // we can end up in a state where we have pre-committed a transaction, then a leader failover + // occurred ... the new leader does not see the pre-committed transaction and does not have + // a running timer. To fix this we really need two persistence events. + // + // The first one, done at pre-commit time will hold the transaction payload. When consensus + // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not + // apply the state in this event. + // + // The second one, done at commit (or abort) time holds only the transaction identifier and + // signals to followers that the state should (or should not) be applied. + // + // In order to make the pre-commit timer working across failovers, though, we need + // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately + // restart the timer. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case COMMIT_PENDING: + LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = now; + processNext = false; + return; + case READY: + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case ABORTED: + case COMMITTED: + case FAILED: + default: + currentQueue.remove(); + } + + if (processNext) { + processNextPending(); + } } boolean startAbort(final SimpleShardDataTreeCohort cohort) { @@ -1062,7 +1202,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return false; } - TipProducingDataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); + DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { @@ -1084,7 +1224,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void rebaseTransactions(final Iterator iter, @Nonnull final TipProducingDataTreeTip newTip) { + private void rebaseTransactions(final Iterator iter, @Nonnull final DataTreeTip newTip) { tip = Preconditions.checkNotNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort; @@ -1103,11 +1243,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { try { tip.validate(cohort.getDataTreeModification()); DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification()); - cohort.userPreCommit(candidate); cohort.setNewCandidate(candidate); tip = candidate; - } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) { + } catch (RuntimeException | DataValidationFailedException e) { LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e); cohort.reportFailure(e); }