X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=20cee65c37ca50e62baf09cd3c4db25539242f8c;hp=064f6f5d8a476c6451ff9b4bd9bce0c4344549e0;hb=4a9e1103eef316f18c4a14cfccb050bddc01564b;hpb=225ff4000ac10d6dbdc2301d8d2165d282721413 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 064f6f5d8a..20cee65c37 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -36,15 +36,23 @@ import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; +import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; +import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; +import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload; +import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; @@ -68,7 +76,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; @@ -102,10 +109,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); private final Map transactionChains = new HashMap<>(); + private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); private final Queue pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); private final Queue pendingFinishCommits = new ArrayDeque<>(); + + /** + * Callbacks that need to be invoked once a payload is replicated. + */ + private final Map replicationCallbacks = new HashMap<>(); + private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final Collection> metadata; @@ -124,7 +138,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private SchemaContext schemaContext; - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { @@ -139,19 +153,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { tip = dataTree; } - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, final YangInstanceIdentifier root, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, - final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) { + final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, + final ShardDataTreeMetadata... metadata) { this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root), - treeChangeListenerPublisher, dataChangeListenerPublisher, logContext); + treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata); } @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(), - new DefaultShardDataChangeListenerPublisher(), ""); + new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), ""); } final String logContext() { @@ -312,6 +326,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ((CommitTransactionPayload) payload).getCandidate(); applyRecoveryCandidate(e.getValue()); allMetadataCommittedTransaction(e.getKey()); + } else if (payload instanceof AbortTransactionPayload) { + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); + } else if (payload instanceof PurgeTransactionPayload) { + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); + } else if (payload instanceof CreateLocalHistoryPayload) { + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof CloseLocalHistoryPayload) { + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof PurgeLocalHistoryPayload) { + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof DataTreeCandidatePayload) { applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate()); } else { @@ -367,11 +391,64 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Verify.verify(identifier instanceof TransactionIdentifier); payloadReplicationComplete((TransactionIdentifier) identifier); } + } else if (payload instanceof AbortTransactionPayload) { + if (identifier != null) { + payloadReplicationComplete((AbortTransactionPayload) payload); + } else { + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); + } + } else if (payload instanceof PurgeTransactionPayload) { + if (identifier != null) { + payloadReplicationComplete((PurgeTransactionPayload) payload); + } else { + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); + } + } else if (payload instanceof CloseLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CloseLocalHistoryPayload) payload); + } else { + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } + } else if (payload instanceof CloseLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CloseLocalHistoryPayload) payload); + } else { + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } + } else if (payload instanceof CreateLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CreateLocalHistoryPayload)payload); + } else { + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); + } + } else if (payload instanceof PurgeLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((PurgeLocalHistoryPayload)payload); + } else { + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + } } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } } + private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) { + if (callback != null) { + replicationCallbacks.put(payload, callback); + } + shard.persistPayload(id, payload, true); + } + + private void payloadReplicationComplete(final AbstractIdentifiablePayload payload) { + final Runnable callback = replicationCallbacks.remove(payload); + if (callback != null) { + LOG.debug("{}: replication of {} completed, invoking {}", logContext, payload.getIdentifier(), callback); + callback.run(); + } else { + LOG.debug("{}: replication of {} has no callback", logContext, payload.getIdentifier()); + } + } + private void payloadReplicationComplete(final TransactionIdentifier txId) { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { @@ -380,7 +457,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } if (!current.cohort.getIdentifier().equals(txId)) { - LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext, + LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); return; } @@ -388,17 +465,65 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { finishCommit(current.cohort); } + private void allMetadataAbortedTransaction(final TransactionIdentifier txId) { + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionAborted(txId); + } + } + private void allMetadataCommittedTransaction(final TransactionIdentifier txId) { for (ShardDataTreeMetadata m : metadata) { m.onTransactionCommitted(txId); } } - ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { - ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier); + private void allMetadataPurgedTransaction(final TransactionIdentifier txId) { + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionPurged(txId); + } + } + + private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryCreated(historyId); + } + } + + private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryClosed(historyId); + } + } + + private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryPurged(historyId); + } + } + + /** + * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)}, + * this method is used for re-establishing state when we are taking over + * + * @param historyId Local history identifier + * @param closed True if the chain should be created in closed state (i.e. pending purge) + * @return Transaction chain handle + */ + ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId, + final boolean closed) { + final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this); + final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret); + Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, + existing); + return ret; + } + + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) { + ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { - chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this); - transactionChains.put(localHistoryIdentifier, chain); + chain = new ShardDataTreeTransactionChain(historyId, this); + transactionChains.put(historyId, chain); + shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true); } return chain; @@ -406,7 +531,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { if (txId.getHistoryId().getHistoryId() == 0) { - return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot()); + return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot()); } return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId); @@ -446,21 +571,56 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - void closeAllTransactionChains() { + /** + * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled + * replication callbacks. + */ + void purgeLeaderState() { for (ShardDataTreeTransactionChain chain : transactionChains.values()) { chain.close(); } transactionChains.clear(); + replicationCallbacks.clear(); } - void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) { - final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId); - if (chain != null) { - chain.close(); - } else { - LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId); + /** + * Close a single transaction chain. + * + * @param id History identifier + * @param callback Callback to invoke upon completion, may be null + */ + void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.get(id); + if (chain == null) { + LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; } + + chain.close(); + replicatePayload(id, CloseLocalHistoryPayload.create(id), callback); + } + + /** + * Purge a single transaction chain. + * + * @param id History identifier + * @param callback Callback to invoke upon completion, may be null + */ + void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.remove(id); + if (chain == null) { + LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; + } + + replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } Entry>>, @@ -493,8 +653,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - void abortTransaction(final AbstractShardDataTreeTransaction transaction) { - // Intentional no-op + void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { + final TransactionIdentifier id = transaction.getIdentifier(); + LOG.debug("{}: aborting transaction {}", logContext, id); + replicatePayload(id, AbortTransactionPayload.create(id), callback); + } + + @Override + void abortFromTransactionActor(final AbstractShardDataTreeTransaction transaction) { + // No-op for free-standing transactions + } @Override @@ -505,6 +673,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return createReadyCohort(transaction.getIdentifier(), snapshot); } + void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { + LOG.debug("{}: purging transaction {}", logContext, id); + replicatePayload(id, PurgeTransactionPayload.create(id), callback); + } + public Optional> readNode(final YangInstanceIdentifier path) { return dataTree.takeSnapshot().readNode(path); } @@ -537,8 +710,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } public Collection getAndClearPendingTransactions() { - Collection ret = new ArrayList<>(pendingTransactions.size() + pendingCommits.size() - + pendingFinishCommits.size()); + Collection ret = new ArrayList<>(getQueueSize()); for (CommitEntry entry: pendingFinishCommits) { ret.add(entry.cohort); @@ -561,28 +733,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @SuppressWarnings("checkstyle:IllegalCatch") private void processNextPendingTransaction() { - while (!pendingTransactions.isEmpty()) { - final CommitEntry entry = pendingTransactions.peek(); + processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); - if (cohort.isFailed()) { - LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier()); - pendingTransactions.remove(); - continue; - } - - if (cohort.getState() != State.CAN_COMMIT_PENDING) { - break; - } - LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); - entry.lastAccess = shard.ticker().read(); + entry.lastAccess = ticker().read(); return; } catch (ConflictingModificationAppliedException e) { LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), @@ -604,24 +765,28 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one pendingTransactions.poll().cohort.failedCanCommit(cause); - } + }); + } - maybeRunOperationOnPendingTransactionsComplete(); + private void processNextPending() { + processNextPendingCommit(); + processNextPendingTransaction(); } - private void processNextPendingCommit() { - while (!pendingCommits.isEmpty()) { - final CommitEntry entry = pendingCommits.peek(); + private void processNextPending(final Queue queue, final State allowedState, + final Consumer processor) { + while (!queue.isEmpty()) { + final CommitEntry entry = queue.peek(); final SimpleShardDataTreeCohort cohort = entry.cohort; if (cohort.isFailed()) { LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier()); - pendingCommits.remove(); + queue.remove(); continue; } - if (cohort.getState() == State.COMMIT_PENDING) { - startCommit(cohort, cohort.getCandidate()); + if (cohort.getState() == allowedState) { + processor.accept(entry); } break; @@ -630,9 +795,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { maybeRunOperationOnPendingTransactionsComplete(); } - private void processNextPending() { - processNextPendingCommit(); - processNextPendingTransaction(); + private void processNextPendingCommit() { + processNextPending(pendingCommits, State.COMMIT_PENDING, + entry -> startCommit(entry.cohort, entry.cohort.getCandidate())); + } + + private boolean peekNextPendingCommit() { + final CommitEntry first = pendingCommits.peek(); + return first != null && first.cohort.getState() == State.COMMIT_PENDING; } void startCanCommit(final SimpleShardDataTreeCohort cohort) { @@ -658,17 +828,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final SimpleShardDataTreeCohort current = entry.cohort; Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); + + LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier()); + final DataTreeCandidateTip candidate; try { candidate = tip.prepare(cohort.getDataTreeModification()); - } catch (Exception e) { - failPreCommit(e); - return; - } - - try { cohort.userPreCommit(candidate); - } catch (ExecutionException | TimeoutException e) { + } catch (ExecutionException | TimeoutException | RuntimeException e) { failPreCommit(e); return; } @@ -676,10 +843,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Set the tip of the data tree. tip = Verify.verifyNotNull(candidate); - entry.lastAccess = shard.ticker().read(); + entry.lastAccess = ticker().read(); pendingTransactions.remove(); pendingCommits.add(entry); + + LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); + cohort.successfulPreCommit(candidate); processNextPendingTransaction(); @@ -733,34 +903,51 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } - if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) { - LOG.debug("{}: No replication required, proceeding to finish commit", logContext); - pendingCommits.remove(); - pendingFinishCommits.add(entry); - finishCommit(cohort); - return; - } + LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier()); final TransactionIdentifier txId = cohort.getIdentifier(); final Payload payload; try { payload = CommitTransactionPayload.create(txId, candidate); - - // Once completed, we will continue via payloadReplicationComplete - entry.lastAccess = shard.ticker().read(); - shard.persistPayload(txId, payload); - - LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId); - - pendingCommits.remove(); - pendingFinishCommits.add(entry); } catch (IOException e) { LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e); pendingCommits.poll().cohort.failedCommit(e); + processNextPending(); return; } - processNextPending(); + // We process next transactions pending canCommit before we call persistPayload to possibly progress subsequent + // transactions to the COMMIT_PENDING state so the payloads can be batched for replication. This is done for + // single-shard transactions that immediately transition from canCommit to preCommit to commit. Note that + // if the next pending transaction is progressed to COMMIT_PENDING and this method (startCommit) is called, + // the next transaction will not attempt to replicate b/c the current transaction is still at the head of the + // pendingCommits queue. + processNextPendingTransaction(); + + // After processing next pending transactions, we can now remove the current transaction from pendingCommits. + // Note this must be done before the call to peekNextPendingCommit below so we check the next transaction + // in order to properly determine the batchHint flag for the call to persistPayload. + pendingCommits.remove(); + pendingFinishCommits.add(entry); + + // See if the next transaction is pending commit (ie in the COMMIT_PENDING state) so it can be batched with + // this transaction for replication. + boolean replicationBatchHint = peekNextPendingCommit(); + + // Once completed, we will continue via payloadReplicationComplete + shard.persistPayload(txId, payload, replicationBatchHint); + + entry.lastAccess = shard.ticker().read(); + + LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId); + + // Process the next transaction pending commit, if any. If there is one it will be batched with this + // transaction for replication. + processNextPendingCommit(); + } + + Collection getCohortActors() { + return cohortRegistry.getCohortActors(); } void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { @@ -772,14 +959,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeModification modification) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); - pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read())); + pendingTransactions.add(new CommitEntry(cohort, ticker().read())); return cohort; } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); - final long now = shard.ticker().read(); + final long now = ticker().read(); final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; @@ -882,7 +1069,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return true; } else { - newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), dataTree); + newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), newTip); } } @@ -891,7 +1078,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void rebaseTransactions(Iterator iter, @Nonnull TipProducingDataTreeTip newTip) { + private void rebaseTransactions(final Iterator iter, @Nonnull final TipProducingDataTreeTip newTip) { tip = Preconditions.checkNotNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort;