X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=b2549eaf4d70c5abce99a65fa516afdb1991819f;hb=HEAD;hp=bbcc42c3b83def4aebf2dde02301006081b95c16;hpb=beff6b6befd02f9a6dd7a4db10daad611776afab;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index bbcc42c3b8..72e7a545a7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -7,15 +7,16 @@ */ package org.opendaylight.controller.cluster.datastore; +import static akka.actor.ActorRef.noSender; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; import akka.actor.ActorRef; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -34,7 +35,6 @@ import java.util.Deque; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.OptionalLong; import java.util.Queue; @@ -50,43 +50,49 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.node.utils.transformer.ReusableNormalizedNodePruner; import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion; import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; +import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; +import org.opendaylight.controller.cluster.raft.messages.Payload; import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; +import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.yang.data.tree.api.DataTree; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeTip; +import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.tree.api.ModificationType; +import org.opendaylight.yangtools.yang.data.tree.api.TreeType; +import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; @@ -98,6 +104,8 @@ import scala.concurrent.duration.FiniteDuration; *

* This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe. */ +@VisibleForTesting +// non-final for mocking public class ShardDataTree extends ShardDataTreeTransactionParent { private static final class CommitEntry { final SimpleShardDataTreeCohort cohort; @@ -110,7 +118,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override public String toString() { - return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]"; + return "CommitEntry [tx=" + cohort.transactionId() + ", state=" + cohort.getState() + "]"; } } @@ -151,12 +159,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { */ private DataTreeTip tip; - private SchemaContext schemaContext; + private EffectiveModelContext schemaContext; private DataSchemaContextTree dataSchemaContext; private int currentTransactionBatch; - ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree, + ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final DataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { @@ -170,7 +178,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { tip = dataTree; } - ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, + ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType, final YangInstanceIdentifier root, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final String logContext, @@ -188,8 +196,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @VisibleForTesting - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { - this(shard, schemaContext, treeType, YangInstanceIdentifier.empty(), + public ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType) { + this(shard, schemaContext, treeType, YangInstanceIdentifier.of(), new DefaultShardDataTreeChangeListenerPublisher(""), ""); } @@ -201,21 +209,22 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return shard.ticker().read(); } - public DataTree getDataTree() { + final DataTree getDataTree() { return dataTree; } - SchemaContext getSchemaContext() { + @VisibleForTesting + final EffectiveModelContext getSchemaContext() { return schemaContext; } - void updateSchemaContext(final SchemaContext newSchemaContext) { - dataTree.setSchemaContext(newSchemaContext); - this.schemaContext = requireNonNull(newSchemaContext); - this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); + final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) { + dataTree.setEffectiveModelContext(newSchemaContext); + schemaContext = newSchemaContext; + dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); } - void resetTransactionBatch() { + final void resetTransactionBatch() { currentTransactionBatch = 0; } @@ -225,7 +234,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @return A state snapshot */ @NonNull ShardDataTreeSnapshot takeStateSnapshot() { - final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get(); + final NormalizedNode rootNode = takeSnapshot().readNode(YangInstanceIdentifier.of()).orElseThrow(); final Builder>, ShardDataTreeSnapshotMetadata> metaBuilder = ImmutableMap.builder(); @@ -252,14 +261,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } final Map>, ShardDataTreeSnapshotMetadata> snapshotMeta; - if (snapshot instanceof MetadataShardDataTreeSnapshot) { - snapshotMeta = ((MetadataShardDataTreeSnapshot) snapshot).getMetadata(); + if (snapshot instanceof MetadataShardDataTreeSnapshot metaSnapshot) { + snapshotMeta = metaSnapshot.getMetadata(); } else { snapshotMeta = ImmutableMap.of(); } - for (ShardDataTreeMetadata m : metadata) { - final ShardDataTreeSnapshotMetadata s = snapshotMeta.get(m.getSupportedType()); + for (var m : metadata) { + final var s = snapshotMeta.get(m.getSupportedType()); if (s != null) { m.applySnapshot(s); } else { @@ -267,16 +276,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification(); + final DataTreeModification unwrapped = newModification(); final DataTreeModification mod = wrapper.apply(unwrapped); // delete everything first - mod.delete(YangInstanceIdentifier.empty()); + mod.delete(YangInstanceIdentifier.of()); - final Optional> maybeNode = snapshot.getRootNode(); - if (maybeNode.isPresent()) { + snapshot.getRootNode().ifPresent(rootNode -> { // Add everything from the remote node back - mod.write(YangInstanceIdentifier.empty(), maybeNode.get()); - } + mod.write(YangInstanceIdentifier.of(), rootNode); + }); + mod.ready(); dataTree.validate(unwrapped); @@ -294,16 +303,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + final void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation applySnapshot(snapshot, UnaryOperator.identity()); } - private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) { - return new PruningDataTreeModification(delegate, dataTree, - // TODO: we should be able to reuse the pruner, provided we are not reentrant - ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext)); - } - /** * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data * pruning in an attempt to adjust the state to our current SchemaContext. @@ -311,34 +315,53 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoverySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { - applySnapshot(snapshot, this::wrapWithPruning); + final void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException { + // TODO: we should be able to reuse the pruner, provided we are not reentrant + final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext( + dataSchemaContext); + if (snapshot.needsMigration()) { + final ReusableNormalizedNodePruner uintPruner = pruner.withUintAdaption(); + applySnapshot(snapshot.getSnapshot(), + delegate -> new PruningDataTreeModification.Proactive(delegate, dataTree, uintPruner)); + } else { + applySnapshot(snapshot.getSnapshot(), + delegate -> new PruningDataTreeModification.Reactive(delegate, dataTree, pruner)); + } } @SuppressWarnings("checkstyle:IllegalCatch") private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException { - final Entry entry = payload.getCandidate(); - final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification(); - // FIXME: CONTROLLER-1923: examine version first - final PruningDataTreeModification mod = wrapWithPruning(unwrapped); - DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate()); - mod.ready(); + final var entry = payload.acquireCandidate(); + final var unwrapped = newModification(); + final var pruningMod = createPruningModification(unwrapped, + NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.streamVersion()) > 0); + DataTreeCandidates.applyToModification(pruningMod, entry.candidate()); + pruningMod.ready(); LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped); try { dataTree.validate(unwrapped); dataTree.commit(dataTree.prepare(unwrapped)); } catch (Exception e) { - File file = new File(System.getProperty("karaf.data", "."), + final var file = new File(System.getProperty("karaf.data", "."), "failed-recovery-payload-" + logContext + ".out"); DataTreeModificationOutput.toFile(file, unwrapped); - throw new IllegalStateException(String.format( - "%s: Failed to apply recovery payload. Modification data was written to file %s", - logContext, file), e); + throw new IllegalStateException( + "%s: Failed to apply recovery payload. Modification data was written to file %s".formatted( + logContext, file), + e); } - allMetadataCommittedTransaction(entry.getKey()); + allMetadataCommittedTransaction(entry.transactionId()); + } + + private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped, + final boolean uintAdapting) { + // TODO: we should be able to reuse the pruner, provided we are not reentrant + final var pruner = ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext); + return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption()) + : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner); } /** @@ -349,19 +372,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws IOException when the snapshot fails to deserialize * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoveryPayload(final @NonNull Payload payload) throws IOException { - if (payload instanceof CommitTransactionPayload) { - applyRecoveryCandidate((CommitTransactionPayload) payload); - } else if (payload instanceof AbortTransactionPayload) { - allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); - } else if (payload instanceof PurgeTransactionPayload) { - allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); - } else if (payload instanceof CreateLocalHistoryPayload) { - allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); - } else if (payload instanceof CloseLocalHistoryPayload) { - allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); - } else if (payload instanceof PurgeLocalHistoryPayload) { - allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + final void applyRecoveryPayload(final @NonNull Payload payload) throws IOException { + if (payload instanceof CommitTransactionPayload commit) { + applyRecoveryCandidate(commit); + } else if (payload instanceof AbortTransactionPayload abort) { + allMetadataAbortedTransaction(abort.getIdentifier()); + } else if (payload instanceof PurgeTransactionPayload purge) { + allMetadataPurgedTransaction(purge.getIdentifier()); + } else if (payload instanceof CreateLocalHistoryPayload create) { + allMetadataCreatedLocalHistory(create.getIdentifier()); + } else if (payload instanceof CloseLocalHistoryPayload close) { + allMetadataClosedLocalHistory(close.getIdentifier()); + } else if (payload instanceof PurgeLocalHistoryPayload purge) { + allMetadataPurgedLocalHistory(purge.getIdentifier()); + } else if (payload instanceof SkipTransactionsPayload skip) { + allMetadataSkipTransactions(skip); } else { LOG.debug("{}: ignoring unhandled payload {}", logContext, payload); } @@ -369,21 +394,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private void applyReplicatedCandidate(final CommitTransactionPayload payload) throws DataValidationFailedException, IOException { - final Entry entry = payload.getCandidate(); - final TransactionIdentifier identifier = entry.getKey(); - LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); + final var payloadCandidate = payload.acquireCandidate(); + final var transactionId = payloadCandidate.transactionId(); + LOG.debug("{}: Applying foreign transaction {}", logContext, transactionId); - final DataTreeModification mod = dataTree.takeSnapshot().newModification(); + final var mod = newModification(); // TODO: check version here, which will enable us to perform forward-compatibility transformations - DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate()); + DataTreeCandidates.applyToModification(mod, payloadCandidate.candidate()); mod.ready(); LOG.trace("{}: Applying foreign modification {}", logContext, mod); dataTree.validate(mod); - final DataTreeCandidate candidate = dataTree.prepare(mod); + final var candidate = dataTree.prepare(mod); dataTree.commit(candidate); - allMetadataCommittedTransaction(identifier); + allMetadataCommittedTransaction(transactionId); notifyListeners(candidate); } @@ -396,7 +421,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws IOException when the snapshot fails to deserialize * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException, + final void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException, DataValidationFailedException { /* * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload @@ -409,43 +434,74 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe * pre-Boron state -- which limits the number of options here. */ - if (payload instanceof CommitTransactionPayload) { + if (payload instanceof CommitTransactionPayload commit) { if (identifier == null) { - applyReplicatedCandidate((CommitTransactionPayload) payload); + applyReplicatedCandidate(commit); } else { verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + // if we did not track this transaction before, it means that it came from another leader and we are in + // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to + // the local DataTree and would be lost if it was only applied via payloadReplicationComplete(). + if (!payloadReplicationComplete((TransactionIdentifier) identifier)) { + applyReplicatedCandidate(commit); + } } - } else if (payload instanceof AbortTransactionPayload) { + + // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed. + checkRootOverwrite(commit.acquireCandidate().candidate()); + } else if (payload instanceof AbortTransactionPayload abort) { if (identifier != null) { - payloadReplicationComplete((AbortTransactionPayload) payload); + payloadReplicationComplete(abort); } - allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); - } else if (payload instanceof PurgeTransactionPayload) { + allMetadataAbortedTransaction(abort.getIdentifier()); + } else if (payload instanceof PurgeTransactionPayload purge) { if (identifier != null) { - payloadReplicationComplete((PurgeTransactionPayload) payload); + payloadReplicationComplete(purge); } - allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); - } else if (payload instanceof CloseLocalHistoryPayload) { + allMetadataPurgedTransaction(purge.getIdentifier()); + } else if (payload instanceof CloseLocalHistoryPayload close) { if (identifier != null) { - payloadReplicationComplete((CloseLocalHistoryPayload) payload); + payloadReplicationComplete(close); } - allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); - } else if (payload instanceof CreateLocalHistoryPayload) { + allMetadataClosedLocalHistory(close.getIdentifier()); + } else if (payload instanceof CreateLocalHistoryPayload create) { if (identifier != null) { - payloadReplicationComplete((CreateLocalHistoryPayload)payload); + payloadReplicationComplete(create); } - allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); - } else if (payload instanceof PurgeLocalHistoryPayload) { + allMetadataCreatedLocalHistory(create.getIdentifier()); + } else if (payload instanceof PurgeLocalHistoryPayload purge) { if (identifier != null) { - payloadReplicationComplete((PurgeLocalHistoryPayload)payload); + payloadReplicationComplete(purge); } - allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + allMetadataPurgedLocalHistory(purge.getIdentifier()); + } else if (payload instanceof SkipTransactionsPayload skip) { + if (identifier != null) { + payloadReplicationComplete(skip); + } + allMetadataSkipTransactions(skip); } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } } + private void checkRootOverwrite(final DataTreeCandidate candidate) { + final DatastoreContext datastoreContext = shard.getDatastoreContext(); + if (!datastoreContext.isSnapshotOnRootOverwrite()) { + return; + } + + if (!datastoreContext.isPersistent()) { + // FIXME: why don't we want a snapshot in non-persistent state? + return; + } + + // top level container ie "/" + if (candidate.getRootPath().isEmpty() && candidate.getRootNode().modificationType() == ModificationType.WRITE) { + LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext); + shard.self().tell(new InitiateCaptureSnapshot(), noSender()); + } + } + private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) { if (callback != null) { replicationCallbacks.put(payload, callback); @@ -463,22 +519,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void payloadReplicationComplete(final TransactionIdentifier txId) { - final CommitEntry current = pendingFinishCommits.peek(); + private boolean payloadReplicationComplete(final TransactionIdentifier txId) { + final var current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); allMetadataCommittedTransaction(txId); - return; + return false; } - if (!current.cohort.getIdentifier().equals(txId)) { + final var cohortTxId = current.cohort.transactionId(); + if (!cohortTxId.equals(txId)) { LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, - current.cohort.getIdentifier(), txId); + cohortTxId, txId); allMetadataCommittedTransaction(txId); - return; + return false; } finishCommit(current.cohort); + return true; } private void allMetadataAbortedTransaction(final TransactionIdentifier txId) { @@ -517,6 +575,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) { + final var historyId = payload.getIdentifier(); + final var txIds = payload.getTransactionIds(); + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionsSkipped(historyId, txIds); + } + } + /** * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)}, * this method is used for re-establishing state when we are taking over @@ -525,7 +591,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param closed True if the chain should be created in closed state (i.e. pending purge) * @return Transaction chain handle */ - ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId, + final ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId, final boolean closed) { final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this); final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret); @@ -533,7 +599,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } - ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId, + final ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId, final @Nullable Runnable callback) { ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { @@ -548,29 +614,33 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return chain; } - ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { + final @NonNull ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { shard.getShardMBean().incrementReadOnlyTransactionCount(); - if (txId.getHistoryId().getHistoryId() == 0) { - return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot()); - } + final var historyId = txId.getHistoryId(); + return historyId.getHistoryId() == 0 ? newStandaloneReadOnlyTransaction(txId) + : ensureTransactionChain(historyId, null).newReadOnlyTransaction(txId); + } - return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId); + final @NonNull ReadOnlyShardDataTreeTransaction newStandaloneReadOnlyTransaction(final TransactionIdentifier txId) { + return new ReadOnlyShardDataTreeTransaction(this, txId, takeSnapshot()); } - ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { + final @NonNull ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { shard.getShardMBean().incrementReadWriteTransactionCount(); - if (txId.getHistoryId().getHistoryId() == 0) { - return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot() - .newModification()); - } + final var historyId = txId.getHistoryId(); + return historyId.getHistoryId() == 0 ? newStandaloneReadWriteTransaction(txId) + : ensureTransactionChain(historyId, null).newReadWriteTransaction(txId); + } - return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId); + final @NonNull ReadWriteShardDataTreeTransaction newStandaloneReadWriteTransaction( + final TransactionIdentifier txId) { + return new ReadWriteShardDataTreeTransaction(this, txId, newModification()); } @VisibleForTesting - public void notifyListeners(final DataTreeCandidate candidate) { + final void notifyListeners(final DataTreeCandidate candidate) { treeChangeListenerPublisher.publishChanges(candidate); } @@ -578,7 +648,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled * replication callbacks. */ - void purgeLeaderState() { + final void purgeLeaderState() { for (ShardDataTreeTransactionChain chain : transactionChains.values()) { chain.close(); } @@ -593,7 +663,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param id History identifier * @param callback Callback to invoke upon completion, may be null */ - void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { + final void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { if (commonCloseTransactionChain(id, callback)) { replicatePayload(id, CloseLocalHistoryPayload.create(id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); @@ -605,7 +675,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * * @param id History identifier */ - void closeTransactionChain(final LocalHistoryIdentifier id) { + final void closeTransactionChain(final LocalHistoryIdentifier id) { commonCloseTransactionChain(id, null); } @@ -629,7 +699,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param id History identifier * @param callback Callback to invoke upon completion, may be null */ - void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { + final void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { final ShardDataTreeTransactionChain chain = transactionChains.remove(id); if (chain == null) { LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id); @@ -643,23 +713,37 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } - Optional readCurrentData() { - return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()) - .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state)); + final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds, + final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.get(id); + if (chain == null) { + LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; + } + + replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds, + shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); + } + + final Optional readCurrentData() { + return readNode(YangInstanceIdentifier.of()) + .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.of(), state)); } - public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, - final Optional initialState, - final Consumer> onRegistration) { + final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, + final Optional initialState, final Consumer onRegistration) { treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration); } - int getQueueSize() { + final int getQueueSize() { return pendingTransactions.size() + pendingCommits.size() + pendingFinishCommits.size(); } @Override - void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { + final void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { final TransactionIdentifier id = transaction.getIdentifier(); LOG.debug("{}: aborting transaction {}", logContext, id); replicatePayload(id, AbortTransactionPayload.create( @@ -667,13 +751,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - void abortFromTransactionActor(final AbstractShardDataTreeTransaction transaction) { + final void abortFromTransactionActor(final AbstractShardDataTreeTransaction transaction) { // No-op for free-standing transactions - } @Override - ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction, + final ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction, final Optional> participatingShardNames) { final DataTreeModification snapshot = transaction.getSnapshot(); final TransactionIdentifier id = transaction.getIdentifier(); @@ -684,26 +767,27 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames); } - void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { + final void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { LOG.debug("{}: purging transaction {}", logContext, id); replicatePayload(id, PurgeTransactionPayload.create( id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } - public Optional> readNode(final YangInstanceIdentifier path) { - return dataTree.takeSnapshot().readNode(path); + @VisibleForTesting + public final Optional readNode(final YangInstanceIdentifier path) { + return takeSnapshot().readNode(path); } - DataTreeSnapshot takeSnapshot() { + final DataTreeSnapshot takeSnapshot() { return dataTree.takeSnapshot(); } @VisibleForTesting - public DataTreeModification newModification() { - return dataTree.takeSnapshot().newModification(); + final DataTreeModification newModification() { + return takeSnapshot().newModification(); } - public Collection getAndClearPendingTransactions() { + final Collection getAndClearPendingTransactions() { Collection ret = new ArrayList<>(getQueueSize()); for (CommitEntry entry: pendingFinishCommits) { @@ -728,7 +812,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { /** * Called some time after {@link #processNextPendingTransaction()} decides to stop processing. */ - void resumeNextPendingTransaction() { + final void resumeNextPendingTransaction() { LOG.debug("{}: attempting to resume transaction processing", logContext); processNextPending(); } @@ -746,25 +830,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); - LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: Validating transaction {}", logContext, cohort.transactionId()); Exception cause; try { tip.validate(modification); - LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); + LOG.debug("{}: Transaction {} validated", logContext, cohort.transactionId()); cohort.successfulCanCommit(); entry.lastAccess = readTime(); return; } catch (ConflictingModificationAppliedException e) { - LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), + LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.transactionId(), e.getPath()); cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e); } catch (DataValidationFailedException e) { - LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(), + LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.transactionId(), e.getPath(), e); // For debugging purposes, allow dumping of the modification. Coupled with the above // precondition log, it should allow us to understand what went on. - LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.getIdentifier(), modification); + LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.transactionId(), modification); LOG.trace("{}: Current tree: {}", logContext, dataTree); cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e); } catch (Exception e) { @@ -789,7 +873,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final SimpleShardDataTreeCohort cohort = entry.cohort; if (cohort.isFailed()) { - LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: Removing failed transaction {}", logContext, cohort.transactionId()); queue.remove(); continue; } @@ -814,6 +898,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return first != null && first.cohort.getState() == State.COMMIT_PENDING; } + // non-final for mocking void startCanCommit(final SimpleShardDataTreeCohort cohort) { final CommitEntry head = pendingTransactions.peek(); if (head == null) { @@ -834,12 +919,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Collection precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames()); if (precedingShardNames.isEmpty()) { - LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier()); + LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.transactionId()); return; } LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}", - logContext, cohort.getIdentifier(), precedingShardNames); + logContext, cohort.transactionId(), precedingShardNames); final Iterator iter = pendingTransactions.iterator(); int index = -1; int moveToIndex = -1; @@ -850,29 +935,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (cohort.equals(entry.cohort)) { if (moveToIndex < 0) { LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit", - logContext, cohort.getIdentifier()); + logContext, cohort.transactionId()); return; } LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue", - logContext, cohort.getIdentifier(), moveToIndex); + logContext, cohort.transactionId(), moveToIndex); iter.remove(); insertEntry(pendingTransactions, entry, moveToIndex); if (!cohort.equals(pendingTransactions.peek().cohort)) { LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit", - logContext, cohort.getIdentifier()); + logContext, cohort.transactionId()); return; } LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit", - logContext, cohort.getIdentifier()); + logContext, cohort.transactionId()); break; } if (entry.cohort.getState() != State.READY) { LOG.debug("{}: Skipping pending transaction {} in state {}", - logContext, entry.cohort.getIdentifier(), entry.cohort.getState()); + logContext, entry.cohort.transactionId(), entry.cohort.getState()); continue; } @@ -882,16 +967,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (precedingShardNames.equals(pendingPrecedingShardNames)) { if (moveToIndex < 0) { LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}", - logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index); + logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), index); moveToIndex = index; } else { LOG.debug( "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}", - logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex); + logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), moveToIndex); } } else { LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping", - logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier()); + logContext, pendingPrecedingShardNames, entry.cohort.transactionId()); } } } @@ -928,6 +1013,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingTransaction(); } + // non-final for mocking @SuppressWarnings("checkstyle:IllegalCatch") void startPreCommit(final SimpleShardDataTreeCohort cohort) { final CommitEntry entry = pendingTransactions.peek(); @@ -936,7 +1022,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final SimpleShardDataTreeCohort current = entry.cohort; verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); - final TransactionIdentifier currentId = current.getIdentifier(); + final TransactionIdentifier currentId = current.transactionId(); LOG.debug("{}: Preparing transaction {}", logContext, currentId); final DataTreeCandidateTip candidate; @@ -948,9 +1034,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } - cohort.userPreCommit(candidate, new FutureCallback() { + cohort.userPreCommit(candidate, new FutureCallback<>() { @Override - public void onSuccess(final Void noop) { + public void onSuccess(final Empty result) { // Set the tip of the data tree. tip = verifyNotNull(candidate); @@ -981,7 +1067,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @SuppressWarnings("checkstyle:IllegalCatch") private void finishCommit(final SimpleShardDataTreeCohort cohort) { - final TransactionIdentifier txId = cohort.getIdentifier(); + final TransactionIdentifier txId = cohort.transactionId(); final DataTreeCandidate candidate = cohort.getCandidate(); LOG.debug("{}: Resuming commit of transaction {}", logContext, txId); @@ -1012,22 +1098,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { }); } + // non-final for mocking void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { final CommitEntry entry = pendingCommits.peek(); checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort); final SimpleShardDataTreeCohort current = entry.cohort; if (!cohort.equals(current)) { - LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.getIdentifier()); + LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.transactionId()); return; } - LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier()); + LOG.debug("{}: Starting commit for transaction {}", logContext, current.transactionId()); - final TransactionIdentifier txId = cohort.getIdentifier(); + final TransactionIdentifier txId = cohort.transactionId(); final Payload payload; try { - payload = CommitTransactionPayload.create(txId, candidate, + payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(), shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()); } catch (IOException e) { LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e); @@ -1066,16 +1153,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingCommit(); } - Collection getCohortActors() { + final Collection getCohortActors() { return cohortRegistry.getCohortActors(); } - void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { + final void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { cohortRegistry.process(sender, message); } @Override - ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Exception failure) { final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure); pendingTransactions.add(new CommitEntry(cohort, readTime())); @@ -1083,7 +1170,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Optional> participatingShardNames) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf, @@ -1094,17 +1181,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics // the newReadWriteTransaction() - ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Optional> participatingShardNames) { - if (txId.getHistoryId().getHistoryId() == 0) { + final var historyId = txId.getHistoryId(); + if (historyId.getHistoryId() == 0) { return createReadyCohort(txId, mod, participatingShardNames); } - - return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames); + return ensureTransactionChain(historyId, null).createReadyCohort(txId, mod, participatingShardNames); } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") - void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, + final void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, final Function accessTimeUpdater) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = readTime(); @@ -1125,11 +1212,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final OptionalLong updateOpt = accessTimeUpdater.apply(currentTx.cohort); if (updateOpt.isPresent()) { - final long newAccess = updateOpt.getAsLong(); + final long newAccess = updateOpt.orElseThrow(); final long newDelta = now - newAccess; if (newDelta < delta) { LOG.debug("{}: Updated current transaction {} access time", logContext, - currentTx.cohort.getIdentifier()); + currentTx.cohort.transactionId()); currentTx.lastAccess = newAccess; delta = newDelta; } @@ -1144,7 +1231,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final State state = currentTx.cohort.getState(); LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, - currentTx.cohort.getIdentifier(), deltaMillis, state); + currentTx.cohort.transactionId(), deltaMillis, state); boolean processNext = true; final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " + deltaMillis + "ms"); @@ -1184,7 +1271,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { break; case COMMIT_PENDING: LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, - currentTx.cohort.getIdentifier()); + currentTx.cohort.transactionId()); currentTx.lastAccess = now; processNext = false; return; @@ -1203,11 +1290,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + // non-final for mocking boolean startAbort(final SimpleShardDataTreeCohort cohort) { final Iterator it = Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions).iterator(); if (!it.hasNext()) { - LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.transactionId()); return true; } @@ -1215,8 +1303,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final CommitEntry first = it.next(); if (cohort.equals(first.cohort)) { if (cohort.getState() != State.COMMIT_PENDING) { - LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(), - cohort.getIdentifier()); + LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.transactionId(), + cohort.transactionId()); it.remove(); if (cohort.getCandidate() != null) { @@ -1227,15 +1315,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return true; } - LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier()); + LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.transactionId()); return false; } - DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); + DataTreeTip newTip = requireNonNullElse(first.cohort.getCandidate(), dataTree); while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { - LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: aborting queued transaction {}", logContext, cohort.transactionId()); it.remove(); if (cohort.getCandidate() != null) { @@ -1243,12 +1331,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } return true; - } else { - newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), newTip); } + + newTip = requireNonNullElse(e.cohort.getCandidate(), newTip); } - LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier()); + LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.transactionId()); return true; } @@ -1258,16 +1346,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort; if (cohort.getState() == State.CAN_COMMIT_COMPLETE) { - LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.transactionId()); try { tip.validate(cohort.getDataTreeModification()); } catch (DataValidationFailedException | RuntimeException e) { - LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e); + LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.transactionId(), e); cohort.reportFailure(e); } } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) { - LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.transactionId()); try { tip.validate(cohort.getDataTreeModification()); @@ -1276,14 +1364,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { cohort.setNewCandidate(candidate); tip = candidate; } catch (RuntimeException | DataValidationFailedException e) { - LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e); + LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.transactionId(), e); cohort.reportFailure(e); } } } } - void setRunOnPendingTransactionsComplete(final Runnable operation) { + final void setRunOnPendingTransactionsComplete(final Runnable operation) { runOnPendingTransactionsComplete = operation; maybeRunOperationOnPendingTransactionsComplete(); } @@ -1298,16 +1386,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - ShardStats getStats() { + final ShardStats getStats() { return shard.getShardMBean(); } - Iterator cohortIterator() { + final Iterator cohortIterator() { return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions), e -> e.cohort).iterator(); } - void removeTransactionChain(final LocalHistoryIdentifier id) { + final void removeTransactionChain(final LocalHistoryIdentifier id) { if (transactionChains.remove(id) != null) { LOG.debug("{}: Removed transaction chain {}", logContext, id); }