X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=8832cd6d1f6db28fd834134930bfa68ffac5bfab;hp=89fa8fbc2507fc1f25d8de01cabb48a0cfd4359d;hb=ca8bd9ab89b808ea1008ccf07c389097430bc911;hpb=a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 89fa8fbc25..8832cd6d1f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -12,8 +12,15 @@ import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.base.Verify; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.primitives.UnsignedLong; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.File; import java.io.IOException; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayDeque; @@ -27,15 +34,19 @@ import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.UnaryOperator; +import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; +import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; +import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; @@ -68,6 +79,7 @@ import scala.concurrent.duration.Duration; * e.g. it does not expose public interfaces and assumes it is only ever called from a * single thread. * + *

* This class is not part of the API contract and is subject to change at any time. */ @NotThreadSafe @@ -90,6 +102,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Queue pendingTransactions = new ArrayDeque<>(); private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; + private final Collection> metadata; private final TipProducingDataTree dataTree; private final String logContext; private final Shard shard; @@ -99,14 +112,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, - final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) { - this.dataTree = dataTree; + final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, + final ShardDataTreeMetadata... metadata) { + this.dataTree = Preconditions.checkNotNull(dataTree); updateSchemaContext(schemaContext); this.shard = Preconditions.checkNotNull(shard); this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher); this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher); this.logContext = Preconditions.checkNotNull(logContext); + this.metadata = ImmutableList.copyOf(metadata); } public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, @@ -122,10 +137,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { new DefaultShardDataChangeListenerPublisher(), ""); } - String logContext() { + final String logContext() { return logContext; } + final Ticker ticker() { + return shard.ticker(); + } + public TipProducingDataTree getDataTree() { return dataTree; } @@ -134,26 +153,227 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return schemaContext; } - void updateSchemaContext(final SchemaContext schemaContext) { - dataTree.setSchemaContext(schemaContext); - this.schemaContext = Preconditions.checkNotNull(schemaContext); + void updateSchemaContext(final SchemaContext newSchemaContext) { + dataTree.setSchemaContext(newSchemaContext); + this.schemaContext = Preconditions.checkNotNull(newSchemaContext); } - ShardDataTreeSnapshot takeRecoverySnapshot() { - return new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get()); + /** + * Take a snapshot of current state for later recovery. + * + * @return A state snapshot + */ + @Nonnull ShardDataTreeSnapshot takeStateSnapshot() { + final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get(); + final Builder>, ShardDataTreeSnapshotMetadata> metaBuilder = + ImmutableMap.builder(); + + for (ShardDataTreeMetadata m : metadata) { + final ShardDataTreeSnapshotMetadata meta = m.toSnapshot(); + if (meta != null) { + metaBuilder.put(meta.getType(), meta); + } + } + + return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build()); } - void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException { - // FIXME: purge any outstanding transactions + private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot, + final UnaryOperator wrapper) throws DataValidationFailedException { + final Stopwatch elapsed = Stopwatch.createStarted(); - final DataTreeModification snapshot = transaction.getSnapshot(); - snapshot.ready(); + if (!pendingTransactions.isEmpty()) { + LOG.warn("{}: applying state snapshot with pending transactions", logContext); + } + + final Map>, ShardDataTreeSnapshotMetadata> snapshotMeta; + if (snapshot instanceof MetadataShardDataTreeSnapshot) { + snapshotMeta = ((MetadataShardDataTreeSnapshot) snapshot).getMetadata(); + } else { + snapshotMeta = ImmutableMap.of(); + } + + for (ShardDataTreeMetadata m : metadata) { + final ShardDataTreeSnapshotMetadata s = snapshotMeta.get(m.getSupportedType()); + if (s != null) { + m.applySnapshot(s); + } else { + m.reset(); + } + } + + final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification()); + // delete everything first + mod.delete(YangInstanceIdentifier.EMPTY); + + final java.util.Optional> maybeNode = snapshot.getRootNode(); + if (maybeNode.isPresent()) { + // Add everything from the remote node back + mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get()); + } + mod.ready(); + + final DataTreeModification unwrapped = unwrap(mod); + dataTree.validate(unwrapped); + DataTreeCandidateTip candidate = dataTree.prepare(unwrapped); + dataTree.commit(candidate); + notifyListeners(candidate); - dataTree.validate(snapshot); - dataTree.commit(dataTree.prepare(snapshot)); + LOG.debug("{}: state snapshot applied in %s", logContext, elapsed); } - private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { + /** + * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and + * does not perform any pruning. + * + * @param snapshot Snapshot that needs to be applied + * @throws DataValidationFailedException when the snapshot fails to apply + */ + void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + applySnapshot(snapshot, UnaryOperator.identity()); + } + + private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) { + return new PruningDataTreeModification(delegate, dataTree, schemaContext); + } + + private static DataTreeModification unwrap(final DataTreeModification modification) { + if (modification instanceof PruningDataTreeModification) { + return ((PruningDataTreeModification)modification).delegate(); + } + return modification; + } + + /** + * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data + * pruning in an attempt to adjust the state to our current SchemaContext. + * + * @param snapshot Snapshot that needs to be applied + * @throws DataValidationFailedException when the snapshot fails to apply + */ + void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + applySnapshot(snapshot, this::wrapWithPruning); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException { + final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification()); + DataTreeCandidates.applyToModification(mod, candidate); + mod.ready(); + + final DataTreeModification unwrapped = mod.delegate(); + LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped); + + try { + dataTree.validate(unwrapped); + dataTree.commit(dataTree.prepare(unwrapped)); + } catch (Exception e) { + File file = new File(System.getProperty("karaf.data", "."), + "failed-recovery-payload-" + logContext + ".out"); + DataTreeModificationOutput.toFile(file, unwrapped); + throw new IllegalStateException(String.format( + "%s: Failed to apply recovery payload. Modification data was written to file %s", + logContext, file), e); + } + } + + /** + * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data + * pruning in an attempt to adjust the state to our current SchemaContext. + * + * @param payload Payload + * @throws IOException when the snapshot fails to deserialize + * @throws DataValidationFailedException when the snapshot fails to apply + */ + void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException { + if (payload instanceof CommitTransactionPayload) { + final Entry e = + ((CommitTransactionPayload) payload).getCandidate(); + applyRecoveryCandidate(e.getValue()); + allMetadataCommittedTransaction(e.getKey()); + } else if (payload instanceof DataTreeCandidatePayload) { + applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate()); + } else { + LOG.debug("{}: ignoring unhandled payload {}", logContext, payload); + } + } + + private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign) + throws DataValidationFailedException { + LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); + + final DataTreeModification mod = dataTree.takeSnapshot().newModification(); + DataTreeCandidates.applyToModification(mod, foreign); + mod.ready(); + + LOG.trace("{}: Applying foreign modification {}", logContext, mod); + dataTree.validate(mod); + final DataTreeCandidate candidate = dataTree.prepare(mod); + dataTree.commit(candidate); + + notifyListeners(candidate); + } + + /** + * Apply a payload coming from the leader, which could actually be us. This method assumes the leader and follower + * SchemaContexts match and does not perform any pruning. + * + * @param identifier Payload identifier as returned from RaftActor + * @param payload Payload + * @throws IOException when the snapshot fails to deserialize + * @throws DataValidationFailedException when the snapshot fails to apply + */ + void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException, + DataValidationFailedException { + /* + * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload + * if we are the leader and it has originated with us. + * + * The identifier will only ever be non-null when we were the leader which achieved consensus. Unfortunately, + * though, this may not be the case anymore, as we are being called some time afterwards and we may not be + * acting in that capacity anymore. + * + * In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe + * pre-Boron state -- which limits the number of options here. + */ + if (payload instanceof CommitTransactionPayload) { + if (identifier == null) { + final Entry e = + ((CommitTransactionPayload) payload).getCandidate(); + applyReplicatedCandidate(e.getKey(), e.getValue()); + allMetadataCommittedTransaction(e.getKey()); + } else { + Verify.verify(identifier instanceof TransactionIdentifier); + payloadReplicationComplete((TransactionIdentifier) identifier); + } + } else { + LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); + } + } + + private void payloadReplicationComplete(final TransactionIdentifier txId) { + final CommitEntry current = pendingTransactions.peek(); + if (current == null) { + LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); + return; + } + + if (!current.cohort.getIdentifier().equals(txId)) { + LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext, + current.cohort.getIdentifier(), txId); + return; + } + + finishCommit(current.cohort); + } + + private void allMetadataCommittedTransaction(final TransactionIdentifier txId) { + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionCommitted(txId); + } + } + + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier); if (chain == null) { chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this); @@ -180,6 +400,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId); } + @VisibleForTesting public void notifyListeners(final DataTreeCandidate candidate) { treeChangeListenerPublisher.publishChanges(candidate, logContext); dataChangeListenerPublisher.publishChanges(candidate, logContext); @@ -225,22 +446,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Optional> registerChangeListener(final YangInstanceIdentifier path, final AsyncDataChangeListener> listener, final DataChangeScope scope) { - final DataChangeListenerRegistration>> reg = + DataChangeListenerRegistration>> reg = dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope); return new SimpleEntry<>(reg, readCurrentData()); } private Optional readCurrentData() { - final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); + final Optional> currentState = + dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); } - public Entry, Optional> registerTreeChangeListener( - final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { - final ListenerRegistration reg = treeChangeListenerPublisher.registerTreeChangeListener( - path, listener); + public Entry, Optional> + registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { + final ListenerRegistration reg = + treeChangeListenerPublisher.registerTreeChangeListener(path, listener); return new SimpleEntry<>(reg, readCurrentData()); } @@ -249,20 +471,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return pendingTransactions.size(); } - void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { - LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); - - final DataTreeModification mod = dataTree.takeSnapshot().newModification(); - DataTreeCandidates.applyToModification(mod, foreign); - mod.ready(); - - LOG.trace("{}: Applying foreign modification {}", logContext, mod); - dataTree.validate(mod); - final DataTreeCandidate candidate = dataTree.prepare(mod); - dataTree.commit(candidate); - notifyListeners(candidate); - } - @Override void abortTransaction(final AbstractShardDataTreeTransaction transaction) { // Intentional no-op @@ -273,34 +481,40 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); - return createReadyCohort(transaction.getId(), snapshot); + return createReadyCohort(transaction.getIdentifier(), snapshot); } public Optional> readNode(final YangInstanceIdentifier path) { return dataTree.takeSnapshot().readNode(path); } - public DataTreeSnapshot takeSnapshot() { + DataTreeSnapshot takeSnapshot() { return dataTree.takeSnapshot(); } + @VisibleForTesting public DataTreeModification newModification() { return dataTree.takeSnapshot().newModification(); } + /** + * Commits a modification. + * + * @deprecated This method violates DataTree containment and will be removed. + */ @VisibleForTesting - // FIXME: This should be removed, it violates encapsulation + @Deprecated public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException { modification.ready(); dataTree.validate(modification); - DataTreeCandidateTip candidate = dataTree.prepare(modification); + DataTreeCandidate candidate = dataTree.prepare(modification); dataTree.commit(candidate); return candidate; } public Collection getAndClearPendingTransactions() { Collection ret = new ArrayList<>(pendingTransactions.size()); - for(CommitEntry entry: pendingTransactions) { + for (CommitEntry entry: pendingTransactions) { ret.add(entry.cohort); } @@ -308,13 +522,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } + @SuppressWarnings("checkstyle:IllegalCatch") private void processNextTransaction() { while (!pendingTransactions.isEmpty()) { final CommitEntry entry = pendingTransactions.peek(); final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); - if(cohort.getState() != State.CAN_COMMIT_PENDING) { + if (cohort.getState() != State.CAN_COMMIT_PENDING) { break; } @@ -336,7 +551,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // For debugging purposes, allow dumping of the modification. Coupled with the above // precondition log, it should allow us to understand what went on. - LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree); + LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, + dataTree); cause = new TransactionCommitFailedException("Data did not pass validation.", e); } catch (Exception e) { LOG.warn("{}: Unexpected failure in validation phase", logContext, e); @@ -366,6 +582,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextTransaction(); } + @SuppressWarnings("checkstyle:IllegalCatch") void startPreCommit(final SimpleShardDataTreeCohort cohort) { final CommitEntry entry = pendingTransactions.peek(); Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort); @@ -397,6 +614,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextTransaction(); } + @SuppressWarnings("checkstyle:IllegalCatch") private void finishCommit(final SimpleShardDataTreeCohort cohort) { final TransactionIdentifier txId = cohort.getIdentifier(); final DataTreeCandidate candidate = cohort.getCandidate(); @@ -404,24 +622,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Resuming commit of transaction {}", logContext, txId); try { - try { - dataTree.commit(candidate); - } catch (IllegalStateException e) { - // We may get a "store tree and candidate base differ" IllegalStateException from commit under - // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last - // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before - // applying it to the state. We then become the leader and a second tx is pre-committed and - // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign - // candidate via applyState prior to the second tx. Since the second tx has already been - // pre-committed, when it gets here to commit it will get an IllegalStateException. - - // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner - // solution will be forthcoming. - - LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e); - applyForeignCandidate(txId, candidate); - } + dataTree.commit(candidate); } catch (Exception e) { + LOG.error("{}: Failed to commit transaction {}", logContext, txId, e); failCommit(e); return; } @@ -430,7 +633,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); // FIXME: propagate journal index - pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO); LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); @@ -468,32 +670,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId); } - private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) { - final CommitEntry current = pendingTransactions.peek(); - if (current == null) { - LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); - return; - } - - if (!current.cohort.getIdentifier().equals(txId)) { - LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext, - current.cohort.getIdentifier(), txId); - return; - } - - finishCommit(current.cohort); - } - - void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) { - // For now we do not care about anything else but transactions - Verify.verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier)identifier, payload); - } - void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { cohortRegistry.process(sender, message); } + @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification modification) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, @@ -502,11 +683,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return cohort; } - void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload) - throws DataValidationFailedException, IOException { - applyForeignCandidate(identifier, payload.getCandidate().getValue()); - } - + @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED", "DB_DUPLICATE_SWITCH_CLAUSES"}, + justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = shard.ticker().read(); @@ -520,6 +698,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException()); break; case CAN_COMMIT_COMPLETE: + // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause + // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code + // in PRE_COMMIT_COMPLETE is changed. pendingTransactions.poll().cohort.reportFailure(new TimeoutException()); break; case PRE_COMMIT_PENDING: @@ -556,6 +737,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { case FAILED: case READY: default: + // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In + // this case, we just want to drop the current entry that expired and thus ignore the return value. + // In fact we really shouldn't hit this case but we handle all enums for completeness. pendingTransactions.poll(); } @@ -578,7 +762,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (cohort.getState() != State.COMMIT_PENDING) { LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(), cohort.getIdentifier()); - pendingTransactions.poll(); + + pendingTransactions.remove(); processNextTransaction(); } else { LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier()); @@ -605,12 +790,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } private void maybeRunOperationOnPendingTransactionsComplete() { - if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) { - LOG.debug("{}: Pending transactions complete - running operation {}", logContext, - runOnPendingTransactionsComplete); - - runOnPendingTransactionsComplete.run(); - runOnPendingTransactionsComplete = null; - } - } + if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) { + LOG.debug("{}: Pending transactions complete - running operation {}", logContext, + runOnPendingTransactionsComplete); + + runOnPendingTransactionsComplete.run(); + runOnPendingTransactionsComplete = null; + } + } }