X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=bc9549a64071649daf33ee06e66a2a78dcd0511e;hp=5a7ce80ffd14cf61e4086bd3f3a7e7895f91f60e;hb=ba99b089ca16480dd8b65b814f68e3fd26ab1246;hpb=925cb4a228d0fda99c7bfeb432eb25285a223887 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 5a7ce80ffd..bc9549a640 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -13,11 +13,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.primitives.UnsignedLong; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; import java.util.AbstractMap.SimpleEntry; @@ -76,7 +78,8 @@ import scala.concurrent.duration.Duration; * Internal shard state, similar to a DOMStore, but optimized for use in the actor system, * e.g. it does not expose public interfaces and assumes it is only ever called from a * single thread. - *

+ * + *

* This class is not part of the API contract and is subject to change at any time. */ @NotThreadSafe @@ -134,10 +137,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { new DefaultShardDataChangeListenerPublisher(), ""); } - String logContext() { + final String logContext() { return logContext; } + final Ticker ticker() { + return shard.ticker(); + } + public TipProducingDataTree getDataTree() { return dataTree; } @@ -366,7 +373,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier); if (chain == null) { chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this); @@ -474,7 +481,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); - return createReadyCohort(transaction.getId(), snapshot); + return createReadyCohort(transaction.getIdentifier(), snapshot); } public Optional> readNode(final YangInstanceIdentifier path) { @@ -667,6 +674,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { cohortRegistry.process(sender, message); } + @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification modification) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, @@ -675,6 +683,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return cohort; } + @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = shard.ticker().read(); @@ -685,13 +694,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { boolean processNext = true; switch (currentTx.cohort.getState()) { case CAN_COMMIT_PENDING: - pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException()); + pendingTransactions.remove().cohort.failedCanCommit(new TimeoutException()); break; case CAN_COMMIT_COMPLETE: - pendingTransactions.poll().cohort.reportFailure(new TimeoutException()); + // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause + // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code + // in PRE_COMMIT_COMPLETE is changed. + pendingTransactions.remove().cohort.reportFailure(new TimeoutException()); break; case PRE_COMMIT_PENDING: - pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException()); + pendingTransactions.remove().cohort.failedPreCommit(new TimeoutException()); break; case PRE_COMMIT_COMPLETE: // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we @@ -711,7 +723,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // In order to make the pre-commit timer working across failovers, though, we need // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately // restart the timer. - pendingTransactions.poll().cohort.reportFailure(new TimeoutException()); + pendingTransactions.remove().cohort.reportFailure(new TimeoutException()); break; case COMMIT_PENDING: LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, @@ -724,7 +736,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { case FAILED: case READY: default: - pendingTransactions.poll(); + pendingTransactions.remove(); } if (processNext) { @@ -746,7 +758,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (cohort.getState() != State.COMMIT_PENDING) { LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(), cohort.getIdentifier()); - pendingTransactions.poll(); + + pendingTransactions.remove(); processNextTransaction(); } else { LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());