X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=e8b41d97474f49951b8b67846d22c7a06b73d185;hb=52725324973f22ac0c85ed4fd8459cf0ef504407;hp=dcf632d215e930fa2dbc1be3af65e5bb02645298;hpb=3613d3cbde81af18aab2d5917491e9f9b3bf63ca;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index dcf632d215..e8b41d9747 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -14,13 +14,13 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Iterables; import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; @@ -32,10 +32,10 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -44,6 +44,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; @@ -59,23 +60,23 @@ import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModifi import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -128,24 +129,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final Collection> metadata; - private final TipProducingDataTree dataTree; + private final DataTree dataTree; private final String logContext; private final Shard shard; private Runnable runOnPendingTransactionsComplete; /** * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a - * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another + * {@link DataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself. */ - private TipProducingDataTreeTip tip; + private DataTreeTip tip; private SchemaContext schemaContext; private int currentTransactionBatch; - ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { @@ -165,8 +166,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { - this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root), - treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata); + this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher, + dataChangeListenerPublisher, logContext, metadata); + } + + private static DataTree createDataTree(final TreeType treeType, final YangInstanceIdentifier root) { + final DataTreeConfiguration baseConfig = DataTreeConfiguration.getDefault(treeType); + return new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType()) + .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled()) + .setUniqueIndexes(baseConfig.isUniqueIndexEnabled()) + .setRootPath(root) + .build()); } @VisibleForTesting @@ -180,11 +190,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return logContext; } - final Ticker ticker() { - return shard.ticker(); + final long readTime() { + return shard.ticker().read(); } - public TipProducingDataTree getDataTree() { + public DataTree getDataTree() { return dataTree; } @@ -298,7 +308,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + void applyRecoverySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { applySnapshot(snapshot, this::wrapWithPruning); } @@ -332,7 +342,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws IOException when the snapshot fails to deserialize * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException { + void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException, DataValidationFailedException { if (payload instanceof CommitTransactionPayload) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); @@ -392,45 +402,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * pre-Boron state -- which limits the number of options here. */ if (payload instanceof CommitTransactionPayload) { + final TransactionIdentifier txId; if (identifier == null) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); - applyReplicatedCandidate(e.getKey(), e.getValue()); - allMetadataCommittedTransaction(e.getKey()); + txId = e.getKey(); + applyReplicatedCandidate(txId, e.getValue()); } else { Verify.verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + txId = (TransactionIdentifier) identifier; + payloadReplicationComplete(txId); } + allMetadataCommittedTransaction(txId); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); - } else { - allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } else if (payload instanceof PurgeTransactionPayload) { if (identifier != null) { payloadReplicationComplete((PurgeTransactionPayload) payload); - } else { - allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } else if (payload instanceof CloseLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CloseLocalHistoryPayload) payload); - } else { - allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof CreateLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CreateLocalHistoryPayload)payload); - } else { - allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((PurgeLocalHistoryPayload)payload); - } else { - allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } @@ -522,12 +530,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } - ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) { + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId, + @Nullable final Runnable callback) { ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { chain = new ShardDataTreeTransactionChain(historyId, this); transactionChains.put(historyId, chain); - shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true); + replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback); + } else if (callback != null) { + callback.run(); } return chain; @@ -538,7 +549,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot()); } - return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId); + return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId); } ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { @@ -547,7 +558,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { .newModification()); } - return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId); + return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId); } @VisibleForTesting @@ -617,7 +628,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } Optional readCurrentData() { - final Optional> currentState = + final java.util.Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); @@ -660,7 +671,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } public Optional> readNode(final YangInstanceIdentifier path) { - return dataTree.takeSnapshot().readNode(path); + return Optional.fromJavaUtil(dataTree.takeSnapshot().readNode(path)); } DataTreeSnapshot takeSnapshot() { @@ -672,24 +683,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return dataTree.takeSnapshot().newModification(); } - /** - * Commits a modification. - * - * @deprecated This method violates DataTree containment and will be removed. - */ - @VisibleForTesting - @Deprecated - public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException { - // Direct modification commit is a utility, which cannot be used while we have transactions in-flight - Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending"); - - modification.ready(); - dataTree.validate(modification); - DataTreeCandidate candidate = dataTree.prepare(modification); - dataTree.commit(candidate); - return candidate; - } - public Collection getAndClearPendingTransactions() { Collection ret = new ArrayList<>(getQueueSize()); @@ -736,17 +729,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { - cohort.throwCanCommitFailure(); - tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); - entry.lastAccess = ticker().read(); + entry.lastAccess = readTime(); return; } catch (ConflictingModificationAppliedException e) { LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), e.getPath()); - cause = new OptimisticLockFailedException("Optimistic lock failed.", e); + cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e); } catch (DataValidationFailedException e) { LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(), e.getPath(), e); @@ -755,7 +746,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // precondition log, it should allow us to understand what went on. LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree); - cause = new TransactionCommitFailedException("Data did not pass validation.", e); + cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e); } catch (Exception e) { LOG.warn("{}: Unexpected failure in validation phase", logContext, e); cause = e; @@ -804,8 +795,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } void startCanCommit(final SimpleShardDataTreeCohort cohort) { - final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort; - if (!cohort.equals(current)) { + final CommitEntry head = pendingTransactions.peek(); + if (head == null) { + LOG.warn("{}: No transactions enqueued while attempting to start canCommit on {}", logContext, cohort); + return; + } + if (!cohort.equals(head.cohort)) { LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier()); return; } @@ -813,7 +808,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingTransaction(); } - private void failPreCommit(final Exception cause) { + private void failPreCommit(final Throwable cause) { shard.getShardMBean().incrementFailedTransactionsCount(); pendingTransactions.poll().cohort.failedPreCommit(cause); processNextPendingTransaction(); @@ -832,25 +827,34 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeCandidateTip candidate; try { candidate = tip.prepare(cohort.getDataTreeModification()); - cohort.userPreCommit(candidate); - } catch (ExecutionException | TimeoutException | RuntimeException e) { + } catch (RuntimeException e) { failPreCommit(e); return; } - // Set the tip of the data tree. - tip = Verify.verifyNotNull(candidate); + cohort.userPreCommit(candidate, new FutureCallback() { + @Override + public void onSuccess(final Void noop) { + // Set the tip of the data tree. + tip = Verify.verifyNotNull(candidate); - entry.lastAccess = ticker().read(); + entry.lastAccess = readTime(); - pendingTransactions.remove(); - pendingCommits.add(entry); + pendingTransactions.remove(); + pendingCommits.add(entry); - LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); + LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); - cohort.successfulPreCommit(candidate); + cohort.successfulPreCommit(candidate); - processNextPendingTransaction(); + processNextPendingTransaction(); + } + + @Override + public void onFailure(final Throwable failure) { + failPreCommit(failure); + } + }); } private void failCommit(final Exception cause) { @@ -883,12 +887,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); // FIXME: propagate journal index - pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO); - - LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); - notifyListeners(candidate); + pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> { + LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); + notifyListeners(candidate); - processNextPending(); + processNextPending(); + }); } void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { @@ -955,17 +959,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Exception failure) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure); - pendingTransactions.add(new CommitEntry(cohort, ticker().read())); + final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure); + pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, - final DataTreeModification mod) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId, - cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); - pendingTransactions.add(new CommitEntry(cohort, ticker().read())); + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, + cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable), + COMMIT_STEP_TIMEOUT)); + pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } @@ -976,72 +980,107 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return createReadyCohort(txId, mod); } - return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod); + return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod); } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") - void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { + void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, + final Function> accessTimeUpdater) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); - final long now = ticker().read(); + final long now = readTime(); final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; final CommitEntry currentTx = currentQueue.peek(); - if (currentTx != null && currentTx.lastAccess + timeout < now) { - LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, - currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState()); - boolean processNext = true; - switch (currentTx.cohort.getState()) { - case CAN_COMMIT_PENDING: - currentQueue.remove().cohort.failedCanCommit(new TimeoutException()); - break; - case CAN_COMMIT_COMPLETE: - // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause - // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code - // in PRE_COMMIT_COMPLETE is changed. - currentQueue.remove().cohort.reportFailure(new TimeoutException()); - break; - case PRE_COMMIT_PENDING: - currentQueue.remove().cohort.failedPreCommit(new TimeoutException()); - break; - case PRE_COMMIT_COMPLETE: - // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we - // are ready we should commit the transaction, not abort it. Our current software stack does - // not allow us to do that consistently, because we persist at the time of commit, hence - // we can end up in a state where we have pre-committed a transaction, then a leader failover - // occurred ... the new leader does not see the pre-committed transaction and does not have - // a running timer. To fix this we really need two persistence events. - // - // The first one, done at pre-commit time will hold the transaction payload. When consensus - // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not - // apply the state in this event. - // - // The second one, done at commit (or abort) time holds only the transaction identifier and - // signals to followers that the state should (or should not) be applied. - // - // In order to make the pre-commit timer working across failovers, though, we need - // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately - // restart the timer. - currentQueue.remove().cohort.reportFailure(new TimeoutException()); - break; - case COMMIT_PENDING: - LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, - currentTx.cohort.getIdentifier()); - currentTx.lastAccess = now; - processNext = false; - return; - case ABORTED: - case COMMITTED: - case FAILED: - case READY: - default: - currentQueue.remove(); + if (currentTx == null) { + // Empty queue, no-op + return; + } + + long delta = now - currentTx.lastAccess; + if (delta < timeout) { + // Not expired yet, bail + return; + } + + final Optional updateOpt = accessTimeUpdater.apply(currentTx.cohort); + if (updateOpt.isPresent()) { + final long newAccess = updateOpt.get().longValue(); + final long newDelta = now - newAccess; + if (newDelta < delta) { + LOG.debug("{}: Updated current transaction {} access time", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = newAccess; + delta = newDelta; } - if (processNext) { - processNextPending(); + if (delta < timeout) { + // Not expired yet, bail + return; } } + + final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta); + final State state = currentTx.cohort.getState(); + + LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, + currentTx.cohort.getIdentifier(), deltaMillis, state); + boolean processNext = true; + final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " + + deltaMillis + "ms"); + + switch (state) { + case CAN_COMMIT_PENDING: + currentQueue.remove().cohort.failedCanCommit(cohortFailure); + break; + case CAN_COMMIT_COMPLETE: + // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause + // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code + // in PRE_COMMIT_COMPLETE is changed. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case PRE_COMMIT_PENDING: + currentQueue.remove().cohort.failedPreCommit(cohortFailure); + break; + case PRE_COMMIT_COMPLETE: + // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we + // are ready we should commit the transaction, not abort it. Our current software stack does + // not allow us to do that consistently, because we persist at the time of commit, hence + // we can end up in a state where we have pre-committed a transaction, then a leader failover + // occurred ... the new leader does not see the pre-committed transaction and does not have + // a running timer. To fix this we really need two persistence events. + // + // The first one, done at pre-commit time will hold the transaction payload. When consensus + // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not + // apply the state in this event. + // + // The second one, done at commit (or abort) time holds only the transaction identifier and + // signals to followers that the state should (or should not) be applied. + // + // In order to make the pre-commit timer working across failovers, though, we need + // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately + // restart the timer. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case COMMIT_PENDING: + LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = now; + processNext = false; + return; + case READY: + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case ABORTED: + case COMMITTED: + case FAILED: + default: + currentQueue.remove(); + } + + if (processNext) { + processNextPending(); + } } boolean startAbort(final SimpleShardDataTreeCohort cohort) { @@ -1072,7 +1111,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return false; } - TipProducingDataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); + DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { @@ -1094,7 +1133,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void rebaseTransactions(final Iterator iter, @Nonnull final TipProducingDataTreeTip newTip) { + private void rebaseTransactions(final Iterator iter, @Nonnull final DataTreeTip newTip) { tip = Preconditions.checkNotNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort; @@ -1113,11 +1152,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { try { tip.validate(cohort.getDataTreeModification()); DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification()); - cohort.userPreCommit(candidate); cohort.setNewCandidate(candidate); tip = candidate; - } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) { + } catch (RuntimeException | DataValidationFailedException e) { LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e); cohort.reportFailure(e); } @@ -1139,4 +1177,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { runOnPendingTransactionsComplete = null; } } + + ShardStats getStats() { + return shard.getShardMBean(); + } + + Iterator cohortIterator() { + return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions), + e -> e.cohort).iterator(); + } + + void removeTransactionChain(final LocalHistoryIdentifier id) { + if (transactionChains.remove(id) != null) { + LOG.debug("{}: Removed transaction chain {}", logContext, id); + } + } }