X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=c1b83923a662d0753cef52f67ff85e7919098b48;hb=d0f46920468c8e4b67c68bd9058572b2d10d75f1;hp=450de78c6764b790bf308949503d36b802bef753;hpb=b9711f17a53a4fad48197df6c39b58e4faadc862;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 450de78c67..c1b83923a6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -7,15 +7,16 @@ */ package org.opendaylight.controller.cluster.datastore; +import static akka.actor.ActorRef.noSender; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; import akka.actor.ActorRef; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -50,7 +51,6 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.node.utils.transformer.ReusableNormalizedNodePruner; import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; @@ -65,29 +65,34 @@ import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionP import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; +import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.yang.data.tree.api.DataTree; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeTip; +import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.tree.api.ModificationType; +import org.opendaylight.yangtools.yang.data.tree.api.TreeType; +import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -102,6 +107,8 @@ import scala.concurrent.duration.FiniteDuration; *

* This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe. */ +@VisibleForTesting +// non-final for mocking public class ShardDataTree extends ShardDataTreeTransactionParent { private static final class CommitEntry { final SimpleShardDataTreeCohort cohort; @@ -205,21 +212,22 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return shard.ticker().read(); } - public DataTree getDataTree() { + final DataTree getDataTree() { return dataTree; } - SchemaContext getSchemaContext() { + @VisibleForTesting + final SchemaContext getSchemaContext() { return schemaContext; } - void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) { + final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) { dataTree.setEffectiveModelContext(newSchemaContext); - this.schemaContext = newSchemaContext; - this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); + schemaContext = newSchemaContext; + dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); } - void resetTransactionBatch() { + final void resetTransactionBatch() { currentTransactionBatch = 0; } @@ -229,7 +237,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @return A state snapshot */ @NonNull ShardDataTreeSnapshot takeStateSnapshot() { - final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get(); + final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get(); final Builder>, ShardDataTreeSnapshotMetadata> metaBuilder = ImmutableMap.builder(); @@ -276,7 +284,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // delete everything first mod.delete(YangInstanceIdentifier.empty()); - final Optional> maybeNode = snapshot.getRootNode(); + final Optional maybeNode = snapshot.getRootNode(); if (maybeNode.isPresent()) { // Add everything from the remote node back mod.write(YangInstanceIdentifier.empty(), maybeNode.get()); @@ -298,7 +306,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + final void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation applySnapshot(snapshot, UnaryOperator.identity()); } @@ -310,7 +318,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException { + final void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException { // TODO: we should be able to reuse the pruner, provided we are not reentrant final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext( dataSchemaContext); @@ -367,7 +375,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws IOException when the snapshot fails to deserialize * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoveryPayload(final @NonNull Payload payload) throws IOException { + final void applyRecoveryPayload(final @NonNull Payload payload) throws IOException { if (payload instanceof CommitTransactionPayload) { applyRecoveryCandidate((CommitTransactionPayload) payload); } else if (payload instanceof AbortTransactionPayload) { @@ -380,6 +388,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof SkipTransactionsPayload) { + allMetadataSkipTransactions((SkipTransactionsPayload) payload); } else { LOG.debug("{}: ignoring unhandled payload {}", logContext, payload); } @@ -414,7 +424,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws IOException when the snapshot fails to deserialize * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException, + final void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException, DataValidationFailedException { /* * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload @@ -439,6 +449,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { applyReplicatedCandidate((CommitTransactionPayload) payload); } } + + // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed. + checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue() + .getCandidate()); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); @@ -464,11 +478,35 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { payloadReplicationComplete((PurgeLocalHistoryPayload)payload); } allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof SkipTransactionsPayload) { + if (identifier != null) { + payloadReplicationComplete((SkipTransactionsPayload)payload); + } + allMetadataSkipTransactions((SkipTransactionsPayload) payload); } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } } + private void checkRootOverwrite(final DataTreeCandidate candidate) { + final DatastoreContext datastoreContext = shard.getDatastoreContext(); + if (!datastoreContext.isSnapshotOnRootOverwrite()) { + return; + } + + if (!datastoreContext.isPersistent()) { + // FIXME: why don't we want a snapshot in non-persistent state? + return; + } + + // top level container ie "/" + if (candidate.getRootPath().isEmpty() + && candidate.getRootNode().getModificationType() == ModificationType.WRITE) { + LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext); + shard.self().tell(new InitiateCaptureSnapshot(), noSender()); + } + } + private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) { if (callback != null) { replicationCallbacks.put(payload, callback); @@ -541,6 +579,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) { + final var historyId = payload.getIdentifier(); + final var txIds = payload.getTransactionIds(); + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionsSkipped(historyId, txIds); + } + } + /** * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)}, * this method is used for re-establishing state when we are taking over @@ -549,7 +595,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param closed True if the chain should be created in closed state (i.e. pending purge) * @return Transaction chain handle */ - ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId, + final ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId, final boolean closed) { final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this); final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret); @@ -557,7 +603,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } - ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId, + final ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId, final @Nullable Runnable callback) { ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { @@ -572,7 +618,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return chain; } - ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { + final ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { shard.getShardMBean().incrementReadOnlyTransactionCount(); if (txId.getHistoryId().getHistoryId() == 0) { @@ -582,7 +628,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId); } - ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { + final ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { shard.getShardMBean().incrementReadWriteTransactionCount(); if (txId.getHistoryId().getHistoryId() == 0) { @@ -594,7 +640,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @VisibleForTesting - public void notifyListeners(final DataTreeCandidate candidate) { + final void notifyListeners(final DataTreeCandidate candidate) { treeChangeListenerPublisher.publishChanges(candidate); } @@ -602,7 +648,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled * replication callbacks. */ - void purgeLeaderState() { + final void purgeLeaderState() { for (ShardDataTreeTransactionChain chain : transactionChains.values()) { chain.close(); } @@ -617,7 +663,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param id History identifier * @param callback Callback to invoke upon completion, may be null */ - void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { + final void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { if (commonCloseTransactionChain(id, callback)) { replicatePayload(id, CloseLocalHistoryPayload.create(id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); @@ -629,7 +675,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * * @param id History identifier */ - void closeTransactionChain(final LocalHistoryIdentifier id) { + final void closeTransactionChain(final LocalHistoryIdentifier id) { commonCloseTransactionChain(id, null); } @@ -653,7 +699,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param id History identifier * @param callback Callback to invoke upon completion, may be null */ - void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { + final void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { final ShardDataTreeTransactionChain chain = transactionChains.remove(id); if (chain == null) { LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id); @@ -667,23 +713,38 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } - Optional readCurrentData() { + final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds, + final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.get(id); + if (chain == null) { + LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; + } + + replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds, + shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); + } + + final Optional readCurrentData() { return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()) .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state)); } - public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, + final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, final Optional initialState, final Consumer> onRegistration) { treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration); } - int getQueueSize() { + final int getQueueSize() { return pendingTransactions.size() + pendingCommits.size() + pendingFinishCommits.size(); } @Override - void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { + final void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { final TransactionIdentifier id = transaction.getIdentifier(); LOG.debug("{}: aborting transaction {}", logContext, id); replicatePayload(id, AbortTransactionPayload.create( @@ -691,13 +752,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - void abortFromTransactionActor(final AbstractShardDataTreeTransaction transaction) { + final void abortFromTransactionActor(final AbstractShardDataTreeTransaction transaction) { // No-op for free-standing transactions - } @Override - ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction, + final ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction, final Optional> participatingShardNames) { final DataTreeModification snapshot = transaction.getSnapshot(); final TransactionIdentifier id = transaction.getIdentifier(); @@ -708,26 +768,27 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames); } - void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { + final void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { LOG.debug("{}: purging transaction {}", logContext, id); replicatePayload(id, PurgeTransactionPayload.create( id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } - public Optional> readNode(final YangInstanceIdentifier path) { + @VisibleForTesting + public final Optional readNode(final YangInstanceIdentifier path) { return dataTree.takeSnapshot().readNode(path); } - DataTreeSnapshot takeSnapshot() { + final DataTreeSnapshot takeSnapshot() { return dataTree.takeSnapshot(); } @VisibleForTesting - public DataTreeModification newModification() { + final DataTreeModification newModification() { return dataTree.takeSnapshot().newModification(); } - public Collection getAndClearPendingTransactions() { + final Collection getAndClearPendingTransactions() { Collection ret = new ArrayList<>(getQueueSize()); for (CommitEntry entry: pendingFinishCommits) { @@ -752,7 +813,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { /** * Called some time after {@link #processNextPendingTransaction()} decides to stop processing. */ - void resumeNextPendingTransaction() { + final void resumeNextPendingTransaction() { LOG.debug("{}: attempting to resume transaction processing", logContext); processNextPending(); } @@ -838,6 +899,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return first != null && first.cohort.getState() == State.COMMIT_PENDING; } + // non-final for mocking void startCanCommit(final SimpleShardDataTreeCohort cohort) { final CommitEntry head = pendingTransactions.peek(); if (head == null) { @@ -952,6 +1014,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingTransaction(); } + // non-final for mocking @SuppressWarnings("checkstyle:IllegalCatch") void startPreCommit(final SimpleShardDataTreeCohort cohort) { final CommitEntry entry = pendingTransactions.peek(); @@ -972,9 +1035,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } - cohort.userPreCommit(candidate, new FutureCallback() { + cohort.userPreCommit(candidate, new FutureCallback<>() { @Override - public void onSuccess(final Void noop) { + public void onSuccess(final Empty result) { // Set the tip of the data tree. tip = verifyNotNull(candidate); @@ -1036,6 +1099,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { }); } + // non-final for mocking void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { final CommitEntry entry = pendingCommits.peek(); checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort); @@ -1090,16 +1154,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingCommit(); } - Collection getCohortActors() { + final Collection getCohortActors() { return cohortRegistry.getCohortActors(); } - void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { + final void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { cohortRegistry.process(sender, message); } @Override - ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Exception failure) { final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure); pendingTransactions.add(new CommitEntry(cohort, readTime())); @@ -1107,7 +1171,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Optional> participatingShardNames) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf, @@ -1118,7 +1182,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics // the newReadWriteTransaction() - ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Optional> participatingShardNames) { if (txId.getHistoryId().getHistoryId() == 0) { return createReadyCohort(txId, mod, participatingShardNames); @@ -1128,7 +1192,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") - void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, + final void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, final Function accessTimeUpdater) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = readTime(); @@ -1227,6 +1291,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + // non-final for mocking boolean startAbort(final SimpleShardDataTreeCohort cohort) { final Iterator it = Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions).iterator(); @@ -1255,7 +1320,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return false; } - DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); + DataTreeTip newTip = requireNonNullElse(first.cohort.getCandidate(), dataTree); while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { @@ -1268,7 +1333,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return true; } else { - newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), newTip); + newTip = requireNonNullElse(e.cohort.getCandidate(), newTip); } } @@ -1307,7 +1372,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - void setRunOnPendingTransactionsComplete(final Runnable operation) { + final void setRunOnPendingTransactionsComplete(final Runnable operation) { runOnPendingTransactionsComplete = operation; maybeRunOperationOnPendingTransactionsComplete(); } @@ -1322,16 +1387,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - ShardStats getStats() { + final ShardStats getStats() { return shard.getShardMBean(); } - Iterator cohortIterator() { + final Iterator cohortIterator() { return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions), e -> e.cohort).iterator(); } - void removeTransactionChain(final LocalHistoryIdentifier id) { + final void removeTransactionChain(final LocalHistoryIdentifier id) { if (transactionChains.remove(id) != null) { LOG.debug("{}: Removed transaction chain {}", logContext, id); }