X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=37e65f6a96e5130e53af66372ebb73937cf31332;hp=d398afefa7869cb70cc2eec74cf9153a918b162f;hb=653c1f5dd20c851ff0992b8d5ab7b1dcab891fca;hpb=013a6679470bf692753f2e04ab4398c97fd9f5d0 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index d398afefa7..37e65f6a96 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -14,7 +14,6 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -24,7 +23,6 @@ import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -37,15 +35,24 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; +import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; +import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; +import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload; +import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; @@ -56,7 +63,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -101,11 +107,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); + /** + * Process this many transactions in a single batched run. If we exceed this limit, we need to schedule later + * execution to finish up the batch. This is necessary in case of a long list of transactions which progress + * immediately through their preCommit phase -- if that happens, their completion eats up stack frames and could + * result in StackOverflowError. + */ + private static final int MAX_TRANSACTION_BATCH = 100; + private final Map transactionChains = new HashMap<>(); private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); private final Queue pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); private final Queue pendingFinishCommits = new ArrayDeque<>(); + + /** + * Callbacks that need to be invoked once a payload is replicated. + */ + private final Map replicationCallbacks = new HashMap<>(); + private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final Collection> metadata; @@ -124,7 +144,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private SchemaContext schemaContext; - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, + private int currentTransactionBatch; + + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { @@ -139,27 +161,28 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { tip = dataTree; } - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, final YangInstanceIdentifier root, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, - final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) { + final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, + final ShardDataTreeMetadata... metadata) { this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root), - treeChangeListenerPublisher, dataChangeListenerPublisher, logContext); + treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata); } @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(), - new DefaultShardDataChangeListenerPublisher(), ""); + new DefaultShardDataTreeChangeListenerPublisher(""), + new DefaultShardDataChangeListenerPublisher(""), ""); } final String logContext() { return logContext; } - final Ticker ticker() { - return shard.ticker(); + final long readTime() { + return shard.ticker().read(); } public TipProducingDataTree getDataTree() { @@ -175,6 +198,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.schemaContext = Preconditions.checkNotNull(newSchemaContext); } + void resetTransactionBatch() { + currentTransactionBatch = 0; + } + /** * Take a snapshot of current state for later recovery. * @@ -240,7 +267,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { dataTree.commit(candidate); notifyListeners(candidate); - LOG.debug("{}: state snapshot applied in %s", logContext, elapsed); + LOG.debug("{}: state snapshot applied in {}", logContext, elapsed); } /** @@ -312,8 +339,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ((CommitTransactionPayload) payload).getCandidate(); applyRecoveryCandidate(e.getValue()); allMetadataCommittedTransaction(e.getKey()); - } else if (payload instanceof DataTreeCandidatePayload) { - applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate()); + } else if (payload instanceof AbortTransactionPayload) { + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); + } else if (payload instanceof PurgeTransactionPayload) { + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); + } else if (payload instanceof CreateLocalHistoryPayload) { + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof CloseLocalHistoryPayload) { + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof PurgeLocalHistoryPayload) { + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } else { LOG.debug("{}: ignoring unhandled payload {}", logContext, payload); } @@ -358,20 +393,65 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * pre-Boron state -- which limits the number of options here. */ if (payload instanceof CommitTransactionPayload) { + final TransactionIdentifier txId; if (identifier == null) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); - applyReplicatedCandidate(e.getKey(), e.getValue()); - allMetadataCommittedTransaction(e.getKey()); + txId = e.getKey(); + applyReplicatedCandidate(txId, e.getValue()); } else { Verify.verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + txId = (TransactionIdentifier) identifier; + payloadReplicationComplete(txId); + } + allMetadataCommittedTransaction(txId); + } else if (payload instanceof AbortTransactionPayload) { + if (identifier != null) { + payloadReplicationComplete((AbortTransactionPayload) payload); + } + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); + } else if (payload instanceof PurgeTransactionPayload) { + if (identifier != null) { + payloadReplicationComplete((PurgeTransactionPayload) payload); + } + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); + } else if (payload instanceof CloseLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CloseLocalHistoryPayload) payload); + } + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof CreateLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CreateLocalHistoryPayload)payload); + } + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof PurgeLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((PurgeLocalHistoryPayload)payload); } + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } } + private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) { + if (callback != null) { + replicationCallbacks.put(payload, callback); + } + shard.persistPayload(id, payload, true); + } + + private void payloadReplicationComplete(final AbstractIdentifiablePayload payload) { + final Runnable callback = replicationCallbacks.remove(payload); + if (callback != null) { + LOG.debug("{}: replication of {} completed, invoking {}", logContext, payload.getIdentifier(), callback); + callback.run(); + } else { + LOG.debug("{}: replication of {} has no callback", logContext, payload.getIdentifier()); + } + } + private void payloadReplicationComplete(final TransactionIdentifier txId) { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { @@ -388,17 +468,68 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { finishCommit(current.cohort); } + private void allMetadataAbortedTransaction(final TransactionIdentifier txId) { + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionAborted(txId); + } + } + private void allMetadataCommittedTransaction(final TransactionIdentifier txId) { for (ShardDataTreeMetadata m : metadata) { m.onTransactionCommitted(txId); } } - ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { - ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier); + private void allMetadataPurgedTransaction(final TransactionIdentifier txId) { + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionPurged(txId); + } + } + + private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryCreated(historyId); + } + } + + private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryClosed(historyId); + } + } + + private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryPurged(historyId); + } + } + + /** + * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)}, + * this method is used for re-establishing state when we are taking over + * + * @param historyId Local history identifier + * @param closed True if the chain should be created in closed state (i.e. pending purge) + * @return Transaction chain handle + */ + ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId, + final boolean closed) { + final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this); + final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret); + Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, + existing); + return ret; + } + + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId, + @Nullable final Runnable callback) { + ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { - chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this); - transactionChains.put(localHistoryIdentifier, chain); + chain = new ShardDataTreeTransactionChain(historyId, this); + transactionChains.put(historyId, chain); + replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback); + } else if (callback != null) { + callback.run(); } return chain; @@ -406,10 +537,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { if (txId.getHistoryId().getHistoryId() == 0) { - return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot()); + return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot()); } - return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId); + return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId); } ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { @@ -418,74 +549,86 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { .newModification()); } - return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId); + return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId); } @VisibleForTesting public void notifyListeners(final DataTreeCandidate candidate) { - treeChangeListenerPublisher.publishChanges(candidate, logContext); - dataChangeListenerPublisher.publishChanges(candidate, logContext); + treeChangeListenerPublisher.publishChanges(candidate); + dataChangeListenerPublisher.publishChanges(candidate); } - void notifyOfInitialData(final DataChangeListenerRegistration>> listenerReg, final Optional currentState) { - if (currentState.isPresent()) { - ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance(); - localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(), - listenerReg.getScope()); - localPublisher.publishChanges(currentState.get(), logContext); - } - } - - void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, - final Optional currentState) { - if (currentState.isPresent()) { - ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance(); - localPublisher.registerTreeChangeListener(path, listener); - localPublisher.publishChanges(currentState.get(), logContext); - } - } - - void closeAllTransactionChains() { + /** + * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled + * replication callbacks. + */ + void purgeLeaderState() { for (ShardDataTreeTransactionChain chain : transactionChains.values()) { chain.close(); } transactionChains.clear(); + replicationCallbacks.clear(); } - void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) { - final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId); - if (chain != null) { - chain.close(); - } else { - LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId); + /** + * Close a single transaction chain. + * + * @param id History identifier + * @param callback Callback to invoke upon completion, may be null + */ + void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.get(id); + if (chain == null) { + LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; } + + chain.close(); + replicatePayload(id, CloseLocalHistoryPayload.create(id), callback); } - Entry>>, - Optional> registerChangeListener(final YangInstanceIdentifier path, - final AsyncDataChangeListener> listener, - final DataChangeScope scope) { - DataChangeListenerRegistration>> reg = - dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope); + /** + * Purge a single transaction chain. + * + * @param id History identifier + * @param callback Callback to invoke upon completion, may be null + */ + void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.remove(id); + if (chain == null) { + LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; + } - return new SimpleEntry<>(reg, readCurrentData()); + replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } - private Optional readCurrentData() { + void registerDataChangeListener(final YangInstanceIdentifier path, + final AsyncDataChangeListener> listener, + final DataChangeScope scope, final Optional initialState, + final Consumer>>> + onRegistration) { + dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration); + } + + Optional readCurrentData() { final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); } - public Entry, Optional> - registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { - final ListenerRegistration reg = - treeChangeListenerPublisher.registerTreeChangeListener(path, listener); - - return new SimpleEntry<>(reg, readCurrentData()); + public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, + final Optional initialState, + final Consumer> onRegistration) { + treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration); } int getQueueSize() { @@ -493,8 +636,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - void abortTransaction(final AbstractShardDataTreeTransaction transaction) { - // Intentional no-op + void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { + final TransactionIdentifier id = transaction.getIdentifier(); + LOG.debug("{}: aborting transaction {}", logContext, id); + replicatePayload(id, AbortTransactionPayload.create(id), callback); + } + + @Override + void abortFromTransactionActor(final AbstractShardDataTreeTransaction transaction) { + // No-op for free-standing transactions + } @Override @@ -505,6 +656,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return createReadyCohort(transaction.getIdentifier(), snapshot); } + void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { + LOG.debug("{}: purging transaction {}", logContext, id); + replicatePayload(id, PurgeTransactionPayload.create(id), callback); + } + public Optional> readNode(final YangInstanceIdentifier path) { return dataTree.takeSnapshot().readNode(path); } @@ -518,24 +674,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return dataTree.takeSnapshot().newModification(); } - /** - * Commits a modification. - * - * @deprecated This method violates DataTree containment and will be removed. - */ - @VisibleForTesting - @Deprecated - public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException { - // Direct modification commit is a utility, which cannot be used while we have transactions in-flight - Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending"); - - modification.ready(); - dataTree.validate(modification); - DataTreeCandidate candidate = dataTree.prepare(modification); - dataTree.commit(candidate); - return candidate; - } - public Collection getAndClearPendingTransactions() { Collection ret = new ArrayList<>(getQueueSize()); @@ -558,8 +696,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } + /** + * Called some time after {@link #processNextPendingTransaction()} decides to stop processing. + */ + void resumeNextPendingTransaction() { + LOG.debug("{}: attempting to resume transaction processing", logContext); + processNextPending(); + } + @SuppressWarnings("checkstyle:IllegalCatch") private void processNextPendingTransaction() { + ++currentTransactionBatch; + if (currentTransactionBatch > MAX_TRANSACTION_BATCH) { + LOG.debug("{}: Already processed {}, scheduling continuation", logContext, currentTransactionBatch); + shard.scheduleNextPendingTransaction(); + return; + } + processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); @@ -570,7 +723,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); - entry.lastAccess = shard.ticker().read(); + entry.lastAccess = readTime(); return; } catch (ConflictingModificationAppliedException e) { LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), @@ -600,7 +753,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingTransaction(); } - private void processNextPending(Queue queue, State allowedState, Consumer processor) { + private void processNextPending(final Queue queue, final State allowedState, + final Consumer processor) { while (!queue.isEmpty()) { final CommitEntry entry = queue.peek(); final SimpleShardDataTreeCohort cohort = entry.cohort; @@ -632,8 +786,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } void startCanCommit(final SimpleShardDataTreeCohort cohort) { - final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort; - if (!cohort.equals(current)) { + final CommitEntry head = pendingTransactions.peek(); + if (head == null) { + LOG.warn("{}: No transactions enqueued while attempting to start canCommit on {}", logContext, cohort); + return; + } + if (!cohort.equals(head.cohort)) { LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier()); return; } @@ -669,7 +827,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Set the tip of the data tree. tip = Verify.verifyNotNull(candidate); - entry.lastAccess = shard.ticker().read(); + entry.lastAccess = readTime(); pendingTransactions.remove(); pendingCommits.add(entry); @@ -781,77 +939,129 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, - final DataTreeModification modification) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, + ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final Exception failure) { + final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure); + pendingTransactions.add(new CommitEntry(cohort, readTime())); + return cohort; + } + + @Override + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); - pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read())); + pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } + // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics + // the newReadWriteTransaction() + ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + if (txId.getHistoryId().getHistoryId() == 0) { + return createReadyCohort(txId, mod); + } + + return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod); + } + @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") - void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { + void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, + final Function> accessTimeUpdater) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); - final long now = shard.ticker().read(); + final long now = readTime(); final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; final CommitEntry currentTx = currentQueue.peek(); - if (currentTx != null && currentTx.lastAccess + timeout < now) { - LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, - currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState()); - boolean processNext = true; - switch (currentTx.cohort.getState()) { - case CAN_COMMIT_PENDING: - currentQueue.remove().cohort.failedCanCommit(new TimeoutException()); - break; - case CAN_COMMIT_COMPLETE: - // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause - // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code - // in PRE_COMMIT_COMPLETE is changed. - currentQueue.remove().cohort.reportFailure(new TimeoutException()); - break; - case PRE_COMMIT_PENDING: - currentQueue.remove().cohort.failedPreCommit(new TimeoutException()); - break; - case PRE_COMMIT_COMPLETE: - // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we - // are ready we should commit the transaction, not abort it. Our current software stack does - // not allow us to do that consistently, because we persist at the time of commit, hence - // we can end up in a state where we have pre-committed a transaction, then a leader failover - // occurred ... the new leader does not see the pre-committed transaction and does not have - // a running timer. To fix this we really need two persistence events. - // - // The first one, done at pre-commit time will hold the transaction payload. When consensus - // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not - // apply the state in this event. - // - // The second one, done at commit (or abort) time holds only the transaction identifier and - // signals to followers that the state should (or should not) be applied. - // - // In order to make the pre-commit timer working across failovers, though, we need - // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately - // restart the timer. - currentQueue.remove().cohort.reportFailure(new TimeoutException()); - break; - case COMMIT_PENDING: - LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, - currentTx.cohort.getIdentifier()); - currentTx.lastAccess = now; - processNext = false; - return; - case ABORTED: - case COMMITTED: - case FAILED: - case READY: - default: - currentQueue.remove(); + if (currentTx == null) { + // Empty queue, no-op + return; + } + + long delta = now - currentTx.lastAccess; + if (delta < timeout) { + // Not expired yet, bail + return; + } + + final Optional updateOpt = accessTimeUpdater.apply(currentTx.cohort); + if (updateOpt.isPresent()) { + final long newAccess = updateOpt.get().longValue(); + final long newDelta = now - newAccess; + if (newDelta < delta) { + LOG.debug("{}: Updated current transaction {} access time", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = newAccess; + delta = newDelta; } - if (processNext) { - processNextPending(); + if (delta < timeout) { + // Not expired yet, bail + return; } } + + final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta); + final State state = currentTx.cohort.getState(); + + LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, + currentTx.cohort.getIdentifier(), deltaMillis, state); + boolean processNext = true; + final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " + + deltaMillis + "ms"); + + switch (state) { + case CAN_COMMIT_PENDING: + currentQueue.remove().cohort.failedCanCommit(cohortFailure); + break; + case CAN_COMMIT_COMPLETE: + // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause + // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code + // in PRE_COMMIT_COMPLETE is changed. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case PRE_COMMIT_PENDING: + currentQueue.remove().cohort.failedPreCommit(cohortFailure); + break; + case PRE_COMMIT_COMPLETE: + // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we + // are ready we should commit the transaction, not abort it. Our current software stack does + // not allow us to do that consistently, because we persist at the time of commit, hence + // we can end up in a state where we have pre-committed a transaction, then a leader failover + // occurred ... the new leader does not see the pre-committed transaction and does not have + // a running timer. To fix this we really need two persistence events. + // + // The first one, done at pre-commit time will hold the transaction payload. When consensus + // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not + // apply the state in this event. + // + // The second one, done at commit (or abort) time holds only the transaction identifier and + // signals to followers that the state should (or should not) be applied. + // + // In order to make the pre-commit timer working across failovers, though, we need + // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately + // restart the timer. + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case COMMIT_PENDING: + LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, + currentTx.cohort.getIdentifier()); + currentTx.lastAccess = now; + processNext = false; + return; + case READY: + currentQueue.remove().cohort.reportFailure(cohortFailure); + break; + case ABORTED: + case COMMITTED: + case FAILED: + default: + currentQueue.remove(); + } + + if (processNext) { + processNextPending(); + } } boolean startAbort(final SimpleShardDataTreeCohort cohort) { @@ -904,7 +1114,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void rebaseTransactions(Iterator iter, @Nonnull TipProducingDataTreeTip newTip) { + private void rebaseTransactions(final Iterator iter, @Nonnull final TipProducingDataTreeTip newTip) { tip = Preconditions.checkNotNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort; @@ -949,4 +1159,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { runOnPendingTransactionsComplete = null; } } + + ShardStats getStats() { + return shard.getShardMBean(); + } + + Iterator cohortIterator() { + return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions), + e -> e.cohort).iterator(); + } + + void removeTransactionChain(final LocalHistoryIdentifier id) { + if (transactionChains.remove(id) != null) { + LOG.debug("{}: Removed transaction chain {}", logContext, id); + } + } }