Simplify code using Java 8 features
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 7aecda48db6ceaf7ca3a5dbc5ae266847b6676ca..e1c12cd489c3cb6f14164380cbff005512c50431 100644 (file)
@@ -20,21 +20,25 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Iterables;
 import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
-import java.util.concurrent.ExecutionException;
+import java.util.SortedSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.UnaryOperator;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -57,25 +61,23 @@ import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnap
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -101,6 +103,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             this.cohort = Preconditions.checkNotNull(cohort);
             lastAccess = now;
         }
+
+        @Override
+        public String toString() {
+            return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]";
+        }
     }
 
     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
@@ -116,7 +123,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
-    private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
+    private final Deque<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
 
@@ -126,35 +133,33 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private final Map<Payload, Runnable> replicationCallbacks = new HashMap<>();
 
     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
-    private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
     private final Collection<ShardDataTreeMetadata<?>> metadata;
-    private final TipProducingDataTree dataTree;
+    private final DataTree dataTree;
     private final String logContext;
     private final Shard shard;
     private Runnable runOnPendingTransactionsComplete;
 
     /**
      * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a
-     * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
+     * {@link DataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
      * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current
      * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself.
      */
-    private TipProducingDataTreeTip tip;
+    private DataTreeTip tip;
 
     private SchemaContext schemaContext;
 
     private int currentTransactionBatch;
 
-    ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
+    ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
-            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+            final String logContext,
             final ShardDataTreeMetadata<?>... metadata) {
         this.dataTree = Preconditions.checkNotNull(dataTree);
         updateSchemaContext(schemaContext);
 
         this.shard = Preconditions.checkNotNull(shard);
         this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
-        this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
         this.logContext = Preconditions.checkNotNull(logContext);
         this.metadata = ImmutableList.copyOf(metadata);
         tip = dataTree;
@@ -163,17 +168,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
             final YangInstanceIdentifier root,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
-            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+            final String logContext,
             final ShardDataTreeMetadata<?>... metadata) {
-        this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root),
-                treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata);
+        this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher, logContext, metadata);
+    }
+
+    private static DataTree createDataTree(final TreeType treeType, final YangInstanceIdentifier root) {
+        final DataTreeConfiguration baseConfig = DataTreeConfiguration.getDefault(treeType);
+        return new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType())
+                .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled())
+                .setUniqueIndexes(baseConfig.isUniqueIndexEnabled())
+                .setRootPath(root)
+                .build());
     }
 
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
-                new DefaultShardDataTreeChangeListenerPublisher(""),
-                new DefaultShardDataChangeListenerPublisher(""), "");
+                new DefaultShardDataTreeChangeListenerPublisher(""), "");
     }
 
     final String logContext() {
@@ -184,7 +196,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return shard.ticker().read();
     }
 
-    public TipProducingDataTree getDataTree() {
+    public DataTree getDataTree() {
         return dataTree;
     }
 
@@ -298,12 +310,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param snapshot Snapshot that needs to be applied
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+    void applyRecoverySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
         applySnapshot(snapshot, this::wrapWithPruning);
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
+    private void applyRecoveryCandidate(final DataTreeCandidate candidate) {
         final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
         DataTreeCandidates.applyToModification(mod, candidate);
         mod.ready();
@@ -332,7 +344,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @throws IOException when the snapshot fails to deserialize
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
+    void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException {
         if (payload instanceof CommitTransactionPayload) {
             final Entry<TransactionIdentifier, DataTreeCandidate> e =
                     ((CommitTransactionPayload) payload).getCandidate();
@@ -554,7 +566,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
         treeChangeListenerPublisher.publishChanges(candidate);
-        dataChangeListenerPublisher.publishChanges(candidate);
     }
 
     /**
@@ -609,16 +620,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
     }
 
-    void registerDataChangeListener(final YangInstanceIdentifier path,
-            final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
-            final DataChangeScope scope, final Optional<DataTreeCandidate> initialState,
-            final Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
-                    onRegistration) {
-        dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration);
-    }
-
     Optional<DataTreeCandidate> readCurrentData() {
-        final Optional<NormalizedNode<?, ?>> currentState =
+        final java.util.Optional<NormalizedNode<?, ?>> currentState =
                 dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
         return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
             YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
@@ -648,11 +651,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
 
-        return createReadyCohort(transaction.getIdentifier(), snapshot);
+        return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
     }
 
     void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
@@ -661,7 +665,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
-        return dataTree.takeSnapshot().readNode(path);
+        return Optional.fromJavaUtil(dataTree.takeSnapshot().readNode(path));
     }
 
     DataTreeSnapshot takeSnapshot() {
@@ -727,7 +731,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             } catch (ConflictingModificationAppliedException e) {
                 LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
                     e.getPath());
-                cause = new OptimisticLockFailedException("Optimistic lock failed.", e);
+                cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e);
             } catch (DataValidationFailedException e) {
                 LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
                     e.getPath(), e);
@@ -736,7 +740,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 // precondition log, it should allow us to understand what went on.
                 LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
                         dataTree);
-                cause = new TransactionCommitFailedException("Data did not pass validation.", e);
+                cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
             } catch (Exception e) {
                 LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
                 cause = e;
@@ -791,14 +795,109 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
         if (!cohort.equals(head.cohort)) {
-            LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
-            return;
+            // The tx isn't at the head of the queue so we can't start canCommit at this point. Here we check if this
+            // tx should be moved ahead of other tx's in the READY state in the pendingTransactions queue. If this tx
+            // has other participating shards, it could deadlock with other tx's accessing the same shards
+            // depending on the order the tx's are readied on each shard
+            // (see https://jira.opendaylight.org/browse/CONTROLLER-1836). Therefore, if the preceding participating
+            // shard names for a preceding pending tx, call it A, in the queue matches that of this tx, then this tx
+            // is allowed to be moved ahead of tx A in the queue so it is processed first to avoid potential deadlock
+            // if tx A is behind this tx in the pendingTransactions queue for a preceding shard. In other words, since
+            // canCommmit for this tx was requested before tx A, honor that request. If this tx is moved to the head of
+            // the queue as a result, then proceed with canCommit.
+
+            Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
+            if (precedingShardNames.isEmpty()) {
+                LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier());
+                return;
+            }
+
+            LOG.debug("{}: Evaluating tx {} for canCommit -  preceding participating shard names {}",
+                    logContext, cohort.getIdentifier(), precedingShardNames);
+            final Iterator<CommitEntry> iter = pendingTransactions.iterator();
+            int index = -1;
+            int moveToIndex = -1;
+            while (iter.hasNext()) {
+                final CommitEntry entry = iter.next();
+                ++index;
+
+                if (cohort.equals(entry.cohort)) {
+                    if (moveToIndex < 0) {
+                        LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
+                                logContext, cohort.getIdentifier());
+                        return;
+                    }
+
+                    LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
+                            logContext, cohort.getIdentifier(), moveToIndex);
+                    iter.remove();
+                    insertEntry(pendingTransactions, entry, moveToIndex);
+
+                    if (!cohort.equals(pendingTransactions.peek().cohort)) {
+                        LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
+                                logContext, cohort.getIdentifier());
+                        return;
+                    }
+
+                    LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
+                            logContext, cohort.getIdentifier());
+                    break;
+                }
+
+                if (entry.cohort.getState() != State.READY) {
+                    LOG.debug("{}: Skipping pending transaction {} in state {}",
+                            logContext, entry.cohort.getIdentifier(), entry.cohort.getState());
+                    continue;
+                }
+
+                final Collection<String> pendingPrecedingShardNames = extractPrecedingShardNames(
+                        entry.cohort.getParticipatingShardNames());
+
+                if (precedingShardNames.equals(pendingPrecedingShardNames)) {
+                    if (moveToIndex < 0) {
+                        LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
+                                logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index);
+                        moveToIndex = index;
+                    } else {
+                        LOG.debug(
+                            "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
+                            logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex);
+                    }
+                } else {
+                    LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
+                        logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier());
+                }
+            }
         }
 
         processNextPendingTransaction();
     }
 
-    private void failPreCommit(final Exception cause) {
+    private void insertEntry(Deque<CommitEntry> queue, CommitEntry entry, int atIndex) {
+        if (atIndex == 0) {
+            queue.addFirst(entry);
+            return;
+        }
+
+        LOG.trace("Inserting into Deque at index {}", atIndex);
+
+        Deque<CommitEntry> tempStack = new ArrayDeque<>(atIndex);
+        for (int i = 0; i < atIndex; i++) {
+            tempStack.push(queue.poll());
+        }
+
+        queue.addFirst(entry);
+
+        tempStack.forEach(queue::addFirst);
+    }
+
+    private Collection<String> extractPrecedingShardNames(
+            java.util.Optional<SortedSet<String>> participatingShardNames) {
+        return participatingShardNames.map((Function<SortedSet<String>, Collection<String>>)
+            set -> set.headSet(shard.getShardName())).orElse(Collections.<String>emptyList());
+    }
+
+    private void failPreCommit(final Throwable cause) {
         shard.getShardMBean().incrementFailedTransactionsCount();
         pendingTransactions.poll().cohort.failedPreCommit(cause);
         processNextPendingTransaction();
@@ -817,25 +916,34 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final DataTreeCandidateTip candidate;
         try {
             candidate = tip.prepare(cohort.getDataTreeModification());
-            cohort.userPreCommit(candidate);
-        } catch (ExecutionException | TimeoutException | RuntimeException e) {
+        } catch (RuntimeException e) {
             failPreCommit(e);
             return;
         }
 
-        // Set the tip of the data tree.
-        tip = Verify.verifyNotNull(candidate);
+        cohort.userPreCommit(candidate, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void noop) {
+                // Set the tip of the data tree.
+                tip = Verify.verifyNotNull(candidate);
+
+                entry.lastAccess = readTime();
 
-        entry.lastAccess = readTime();
+                pendingTransactions.remove();
+                pendingCommits.add(entry);
 
-        pendingTransactions.remove();
-        pendingCommits.add(entry);
+                LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
 
-        LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+                cohort.successfulPreCommit(candidate);
 
-        cohort.successfulPreCommit(candidate);
+                processNextPendingTransaction();
+            }
 
-        processNextPendingTransaction();
+            @Override
+            public void onFailure(final Throwable failure) {
+                failPreCommit(failure);
+            }
+        });
     }
 
     private void failCommit(final Exception cause) {
@@ -868,12 +976,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
         // FIXME: propagate journal index
-        pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO);
-
-        LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
-        notifyListeners(candidate);
+        pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> {
+            LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+            notifyListeners(candidate);
 
-        processNextPending();
+            processNextPending();
+        });
     }
 
     void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
@@ -946,92 +1054,124 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
-                cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+                cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
+                        COMMIT_STEP_TIMEOUT), participatingShardNames);
         pendingTransactions.add(new CommitEntry(cohort, readTime()));
         return cohort;
     }
 
     // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
     // the newReadWriteTransaction()
-    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         if (txId.getHistoryId().getHistoryId() == 0) {
-            return createReadyCohort(txId, mod);
+            return createReadyCohort(txId, mod, participatingShardNames);
         }
 
-        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
+        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames);
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
-    void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
+    void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
+            final Function<SimpleShardDataTreeCohort, Optional<Long>> accessTimeUpdater) {
         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
         final long now = readTime();
 
         final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
             !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
         final CommitEntry currentTx = currentQueue.peek();
-        if (currentTx != null && currentTx.lastAccess + timeout < now) {
-            final State state = currentTx.cohort.getState();
-            LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
-                    currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, state);
-            boolean processNext = true;
-            final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
-                    + transactionCommitTimeoutMillis + "ms");
-
-            switch (state) {
-                case CAN_COMMIT_PENDING:
-                    currentQueue.remove().cohort.failedCanCommit(cohortFailure);
-                    break;
-                case CAN_COMMIT_COMPLETE:
-                    // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
-                    // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
-                    // in PRE_COMMIT_COMPLETE is changed.
-                    currentQueue.remove().cohort.reportFailure(cohortFailure);
-                    break;
-                case PRE_COMMIT_PENDING:
-                    currentQueue.remove().cohort.failedPreCommit(cohortFailure);
-                    break;
-                case PRE_COMMIT_COMPLETE:
-                    // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
-                    //        are ready we should commit the transaction, not abort it. Our current software stack does
-                    //        not allow us to do that consistently, because we persist at the time of commit, hence
-                    //        we can end up in a state where we have pre-committed a transaction, then a leader failover
-                    //        occurred ... the new leader does not see the pre-committed transaction and does not have
-                    //        a running timer. To fix this we really need two persistence events.
-                    //
-                    //        The first one, done at pre-commit time will hold the transaction payload. When consensus
-                    //        is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
-                    //        apply the state in this event.
-                    //
-                    //        The second one, done at commit (or abort) time holds only the transaction identifier and
-                    //        signals to followers that the state should (or should not) be applied.
-                    //
-                    //        In order to make the pre-commit timer working across failovers, though, we need
-                    //        a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
-                    //        restart the timer.
-                    currentQueue.remove().cohort.reportFailure(cohortFailure);
-                    break;
-                case COMMIT_PENDING:
-                    LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
-                        currentTx.cohort.getIdentifier());
-                    currentTx.lastAccess = now;
-                    processNext = false;
-                    return;
-                case READY:
-                    currentQueue.remove().cohort.reportFailure(cohortFailure);
-                    break;
-                case ABORTED:
-                case COMMITTED:
-                case FAILED:
-                default:
-                    currentQueue.remove();
+        if (currentTx == null) {
+            // Empty queue, no-op
+            return;
+        }
+
+        long delta = now - currentTx.lastAccess;
+        if (delta < timeout) {
+            // Not expired yet, bail
+            return;
+        }
+
+        final Optional<Long> updateOpt = accessTimeUpdater.apply(currentTx.cohort);
+        if (updateOpt.isPresent()) {
+            final long newAccess =  updateOpt.get().longValue();
+            final long newDelta = now - newAccess;
+            if (newDelta < delta) {
+                LOG.debug("{}: Updated current transaction {} access time", logContext,
+                    currentTx.cohort.getIdentifier());
+                currentTx.lastAccess = newAccess;
+                delta = newDelta;
             }
 
-            if (processNext) {
-                processNextPending();
+            if (delta < timeout) {
+                // Not expired yet, bail
+                return;
             }
         }
+
+        final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta);
+        final State state = currentTx.cohort.getState();
+
+        LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
+            currentTx.cohort.getIdentifier(), deltaMillis, state);
+        boolean processNext = true;
+        final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+                + deltaMillis + "ms");
+
+        switch (state) {
+            case CAN_COMMIT_PENDING:
+                currentQueue.remove().cohort.failedCanCommit(cohortFailure);
+                break;
+            case CAN_COMMIT_COMPLETE:
+                // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+                // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+                // in PRE_COMMIT_COMPLETE is changed.
+                currentQueue.remove().cohort.reportFailure(cohortFailure);
+                break;
+            case PRE_COMMIT_PENDING:
+                currentQueue.remove().cohort.failedPreCommit(cohortFailure);
+                break;
+            case PRE_COMMIT_COMPLETE:
+                // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
+                //        are ready we should commit the transaction, not abort it. Our current software stack does
+                //        not allow us to do that consistently, because we persist at the time of commit, hence
+                //        we can end up in a state where we have pre-committed a transaction, then a leader failover
+                //        occurred ... the new leader does not see the pre-committed transaction and does not have
+                //        a running timer. To fix this we really need two persistence events.
+                //
+                //        The first one, done at pre-commit time will hold the transaction payload. When consensus
+                //        is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
+                //        apply the state in this event.
+                //
+                //        The second one, done at commit (or abort) time holds only the transaction identifier and
+                //        signals to followers that the state should (or should not) be applied.
+                //
+                //        In order to make the pre-commit timer working across failovers, though, we need
+                //        a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
+                //        restart the timer.
+                currentQueue.remove().cohort.reportFailure(cohortFailure);
+                break;
+            case COMMIT_PENDING:
+                LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
+                    currentTx.cohort.getIdentifier());
+                currentTx.lastAccess = now;
+                processNext = false;
+                return;
+            case READY:
+                currentQueue.remove().cohort.reportFailure(cohortFailure);
+                break;
+            case ABORTED:
+            case COMMITTED:
+            case FAILED:
+            default:
+                currentQueue.remove();
+        }
+
+        if (processNext) {
+            processNextPending();
+        }
     }
 
     boolean startAbort(final SimpleShardDataTreeCohort cohort) {
@@ -1062,7 +1202,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return false;
         }
 
-        TipProducingDataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree);
+        DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree);
         while (it.hasNext()) {
             final CommitEntry e = it.next();
             if (cohort.equals(e.cohort)) {
@@ -1084,7 +1224,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void rebaseTransactions(final Iterator<CommitEntry> iter, @Nonnull final TipProducingDataTreeTip newTip) {
+    private void rebaseTransactions(final Iterator<CommitEntry> iter, @Nonnull final DataTreeTip newTip) {
         tip = Preconditions.checkNotNull(newTip);
         while (iter.hasNext()) {
             final SimpleShardDataTreeCohort cohort = iter.next().cohort;
@@ -1103,11 +1243,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 try {
                     tip.validate(cohort.getDataTreeModification());
                     DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
-                    cohort.userPreCommit(candidate);
 
                     cohort.setNewCandidate(candidate);
                     tip = candidate;
-                } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+                } catch (RuntimeException | DataValidationFailedException e) {
                     LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
                     cohort.reportFailure(e);
                 }