X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=4aa7a7b786b6a0b925c4233a9043db59cb2be355;hb=refs%2Fchanges%2F49%2F85749%2F63;hp=bbcc42c3b83def4aebf2dde02301006081b95c16;hpb=beff6b6befd02f9a6dd7a4db10daad611776afab;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
index bbcc42c3b8..4aa7a7b786 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
@@ -7,15 +7,16 @@
*/
package org.opendaylight.controller.cluster.datastore;
+import static akka.actor.ActorRef.noSender;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
+import static java.util.Objects.requireNonNullElse;
import akka.actor.ActorRef;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -50,7 +51,6 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.node.utils.transformer.ReusableNormalizedNodePruner;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
@@ -59,12 +59,17 @@ import org.opendaylight.controller.cluster.datastore.persisted.CommitTransaction
import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload;
import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
@@ -83,9 +88,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +106,8 @@ import scala.concurrent.duration.FiniteDuration;
*
* This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe.
*/
+@VisibleForTesting
+// non-final for mocking
public class ShardDataTree extends ShardDataTreeTransactionParent {
private static final class CommitEntry {
final SimpleShardDataTreeCohort cohort;
@@ -156,7 +166,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
private int currentTransactionBatch;
- ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree,
+ ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final DataTree dataTree,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final String logContext,
final ShardDataTreeMetadata>... metadata) {
@@ -170,7 +180,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
tip = dataTree;
}
- ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+ ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType,
final YangInstanceIdentifier root,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final String logContext,
@@ -188,7 +198,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
@VisibleForTesting
- public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
+ public ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType) {
this(shard, schemaContext, treeType, YangInstanceIdentifier.empty(),
new DefaultShardDataTreeChangeListenerPublisher(""), "");
}
@@ -201,21 +211,22 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return shard.ticker().read();
}
- public DataTree getDataTree() {
+ final DataTree getDataTree() {
return dataTree;
}
- SchemaContext getSchemaContext() {
+ @VisibleForTesting
+ final SchemaContext getSchemaContext() {
return schemaContext;
}
- void updateSchemaContext(final SchemaContext newSchemaContext) {
- dataTree.setSchemaContext(newSchemaContext);
- this.schemaContext = requireNonNull(newSchemaContext);
- this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
+ final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
+ dataTree.setEffectiveModelContext(newSchemaContext);
+ schemaContext = newSchemaContext;
+ dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
- void resetTransactionBatch() {
+ final void resetTransactionBatch() {
currentTransactionBatch = 0;
}
@@ -225,7 +236,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @return A state snapshot
*/
@NonNull ShardDataTreeSnapshot takeStateSnapshot() {
- final NormalizedNode, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
+ final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
final Builder>, ShardDataTreeSnapshotMetadata>> metaBuilder =
ImmutableMap.builder();
@@ -272,7 +283,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
// delete everything first
mod.delete(YangInstanceIdentifier.empty());
- final Optional> maybeNode = snapshot.getRootNode();
+ final Optional maybeNode = snapshot.getRootNode();
if (maybeNode.isPresent()) {
// Add everything from the remote node back
mod.write(YangInstanceIdentifier.empty(), maybeNode.get());
@@ -294,16 +305,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ final void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation
applySnapshot(snapshot, UnaryOperator.identity());
}
- private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
- return new PruningDataTreeModification(delegate, dataTree,
- // TODO: we should be able to reuse the pruner, provided we are not reentrant
- ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext));
- }
-
/**
* Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data
* pruning in an attempt to adjust the state to our current SchemaContext.
@@ -311,19 +317,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoverySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
- applySnapshot(snapshot, this::wrapWithPruning);
+ final void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException {
+ // TODO: we should be able to reuse the pruner, provided we are not reentrant
+ final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
+ dataSchemaContext);
+ if (snapshot.needsMigration()) {
+ final ReusableNormalizedNodePruner uintPruner = pruner.withUintAdaption();
+ applySnapshot(snapshot.getSnapshot(),
+ delegate -> new PruningDataTreeModification.Proactive(delegate, dataTree, uintPruner));
+ } else {
+ applySnapshot(snapshot.getSnapshot(),
+ delegate -> new PruningDataTreeModification.Reactive(delegate, dataTree, pruner));
+ }
}
@SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
- final Entry entry = payload.getCandidate();
+ final Entry entry = payload.acquireCandidate();
final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
- // FIXME: CONTROLLER-1923: examine version first
- final PruningDataTreeModification mod = wrapWithPruning(unwrapped);
+ final PruningDataTreeModification mod = createPruningModification(unwrapped,
+ NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0);
+
DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
mod.ready();
-
LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
try {
@@ -341,6 +357,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
allMetadataCommittedTransaction(entry.getKey());
}
+ private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped,
+ final boolean uintAdapting) {
+ // TODO: we should be able to reuse the pruner, provided we are not reentrant
+ final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
+ dataSchemaContext);
+ return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption())
+ : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner);
+ }
+
/**
* Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data
* pruning in an attempt to adjust the state to our current SchemaContext.
@@ -349,7 +374,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
+ final void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
if (payload instanceof CommitTransactionPayload) {
applyRecoveryCandidate((CommitTransactionPayload) payload);
} else if (payload instanceof AbortTransactionPayload) {
@@ -362,6 +387,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
} else if (payload instanceof PurgeLocalHistoryPayload) {
allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload) {
+ allMetadataSkipTransactions((SkipTransactionsPayload) payload);
} else {
LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
}
@@ -369,7 +396,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
private void applyReplicatedCandidate(final CommitTransactionPayload payload)
throws DataValidationFailedException, IOException {
- final Entry entry = payload.getCandidate();
+ final Entry entry = payload.acquireCandidate();
final TransactionIdentifier identifier = entry.getKey();
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
@@ -396,7 +423,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+ final void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
DataValidationFailedException {
/*
* This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
@@ -414,8 +441,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
applyReplicatedCandidate((CommitTransactionPayload) payload);
} else {
verify(identifier instanceof TransactionIdentifier);
- payloadReplicationComplete((TransactionIdentifier) identifier);
+ // if we did not track this transaction before, it means that it came from another leader and we are in
+ // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to
+ // the local DataTree and would be lost if it was only applied via payloadReplicationComplete().
+ if (!payloadReplicationComplete((TransactionIdentifier) identifier)) {
+ applyReplicatedCandidate((CommitTransactionPayload) payload);
+ }
}
+
+ // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed.
+ checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue()
+ .getCandidate());
} else if (payload instanceof AbortTransactionPayload) {
if (identifier != null) {
payloadReplicationComplete((AbortTransactionPayload) payload);
@@ -441,11 +477,35 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
}
allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((SkipTransactionsPayload)payload);
+ }
+ allMetadataSkipTransactions((SkipTransactionsPayload) payload);
} else {
LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
}
}
+ private void checkRootOverwrite(final DataTreeCandidate candidate) {
+ final DatastoreContext datastoreContext = shard.getDatastoreContext();
+ if (!datastoreContext.isSnapshotOnRootOverwrite()) {
+ return;
+ }
+
+ if (!datastoreContext.isPersistent()) {
+ // FIXME: why don't we want a snapshot in non-persistent state?
+ return;
+ }
+
+ // top level container ie "/"
+ if (candidate.getRootPath().isEmpty()
+ && candidate.getRootNode().getModificationType() == ModificationType.WRITE) {
+ LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext);
+ shard.self().tell(new InitiateCaptureSnapshot(), noSender());
+ }
+ }
+
private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) {
if (callback != null) {
replicationCallbacks.put(payload, callback);
@@ -463,22 +523,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
- private void payloadReplicationComplete(final TransactionIdentifier txId) {
+ private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
final CommitEntry current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
allMetadataCommittedTransaction(txId);
- return;
+ return false;
}
if (!current.cohort.getIdentifier().equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
current.cohort.getIdentifier(), txId);
allMetadataCommittedTransaction(txId);
- return;
+ return false;
}
finishCommit(current.cohort);
+ return true;
}
private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
@@ -517,6 +578,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
+ private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) {
+ final var historyId = payload.getIdentifier();
+ final var txIds = payload.getTransactionIds();
+ for (ShardDataTreeMetadata> m : metadata) {
+ m.onTransactionsSkipped(historyId, txIds);
+ }
+ }
+
/**
* Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
* this method is used for re-establishing state when we are taking over
@@ -525,7 +594,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @param closed True if the chain should be created in closed state (i.e. pending purge)
* @return Transaction chain handle
*/
- ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+ final ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
final boolean closed) {
final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
@@ -533,7 +602,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return ret;
}
- ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+ final ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
final @Nullable Runnable callback) {
ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
@@ -548,7 +617,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return chain;
}
- ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+ final ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
shard.getShardMBean().incrementReadOnlyTransactionCount();
if (txId.getHistoryId().getHistoryId() == 0) {
@@ -558,7 +627,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
}
- ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+ final ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
shard.getShardMBean().incrementReadWriteTransactionCount();
if (txId.getHistoryId().getHistoryId() == 0) {
@@ -570,7 +639,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
@VisibleForTesting
- public void notifyListeners(final DataTreeCandidate candidate) {
+ final void notifyListeners(final DataTreeCandidate candidate) {
treeChangeListenerPublisher.publishChanges(candidate);
}
@@ -578,7 +647,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
* replication callbacks.
*/
- void purgeLeaderState() {
+ final void purgeLeaderState() {
for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
chain.close();
}
@@ -593,7 +662,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
- void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
+ final void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
if (commonCloseTransactionChain(id, callback)) {
replicatePayload(id, CloseLocalHistoryPayload.create(id,
shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
@@ -605,7 +674,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
*
* @param id History identifier
*/
- void closeTransactionChain(final LocalHistoryIdentifier id) {
+ final void closeTransactionChain(final LocalHistoryIdentifier id) {
commonCloseTransactionChain(id, null);
}
@@ -629,7 +698,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
- void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
+ final void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
if (chain == null) {
LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
@@ -643,23 +712,38 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
- Optional readCurrentData() {
+ final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds,
+ final Runnable callback) {
+ final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+ if (chain == null) {
+ LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id);
+ if (callback != null) {
+ callback.run();
+ }
+ return;
+ }
+
+ replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+ }
+
+ final Optional readCurrentData() {
return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
.map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
}
- public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+ final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
final Optional initialState,
final Consumer> onRegistration) {
treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
}
- int getQueueSize() {
+ final int getQueueSize() {
return pendingTransactions.size() + pendingCommits.size() + pendingFinishCommits.size();
}
@Override
- void abortTransaction(final AbstractShardDataTreeTransaction> transaction, final Runnable callback) {
+ final void abortTransaction(final AbstractShardDataTreeTransaction> transaction, final Runnable callback) {
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
replicatePayload(id, AbortTransactionPayload.create(
@@ -667,13 +751,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
@Override
- void abortFromTransactionActor(final AbstractShardDataTreeTransaction> transaction) {
+ final void abortFromTransactionActor(final AbstractShardDataTreeTransaction> transaction) {
// No-op for free-standing transactions
-
}
@Override
- ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+ final ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
final Optional> participatingShardNames) {
final DataTreeModification snapshot = transaction.getSnapshot();
final TransactionIdentifier id = transaction.getIdentifier();
@@ -684,26 +767,27 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
}
- void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+ final void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
LOG.debug("{}: purging transaction {}", logContext, id);
replicatePayload(id, PurgeTransactionPayload.create(
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
- public Optional> readNode(final YangInstanceIdentifier path) {
+ @VisibleForTesting
+ public final Optional readNode(final YangInstanceIdentifier path) {
return dataTree.takeSnapshot().readNode(path);
}
- DataTreeSnapshot takeSnapshot() {
+ final DataTreeSnapshot takeSnapshot() {
return dataTree.takeSnapshot();
}
@VisibleForTesting
- public DataTreeModification newModification() {
+ final DataTreeModification newModification() {
return dataTree.takeSnapshot().newModification();
}
- public Collection getAndClearPendingTransactions() {
+ final Collection getAndClearPendingTransactions() {
Collection ret = new ArrayList<>(getQueueSize());
for (CommitEntry entry: pendingFinishCommits) {
@@ -728,7 +812,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
/**
* Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
*/
- void resumeNextPendingTransaction() {
+ final void resumeNextPendingTransaction() {
LOG.debug("{}: attempting to resume transaction processing", logContext);
processNextPending();
}
@@ -814,6 +898,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return first != null && first.cohort.getState() == State.COMMIT_PENDING;
}
+ // non-final for mocking
void startCanCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry head = pendingTransactions.peek();
if (head == null) {
@@ -928,6 +1013,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
processNextPendingTransaction();
}
+ // non-final for mocking
@SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
@@ -1012,6 +1098,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
});
}
+ // non-final for mocking
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
final CommitEntry entry = pendingCommits.peek();
checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
@@ -1027,7 +1114,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final TransactionIdentifier txId = cohort.getIdentifier();
final Payload payload;
try {
- payload = CommitTransactionPayload.create(txId, candidate,
+ payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(),
shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
} catch (IOException e) {
LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
@@ -1066,16 +1153,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
processNextPendingCommit();
}
- Collection getCohortActors() {
+ final Collection getCohortActors() {
return cohortRegistry.getCohortActors();
}
- void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
+ final void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
cohortRegistry.process(sender, message);
}
@Override
- ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Exception failure) {
final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
pendingTransactions.add(new CommitEntry(cohort, readTime()));
@@ -1083,7 +1170,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
@Override
- ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Optional> participatingShardNames) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
@@ -1094,7 +1181,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
// Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
// the newReadWriteTransaction()
- ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Optional> participatingShardNames) {
if (txId.getHistoryId().getHistoryId() == 0) {
return createReadyCohort(txId, mod, participatingShardNames);
@@ -1104,7 +1191,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
- void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
+ final void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
final Function accessTimeUpdater) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = readTime();
@@ -1203,6 +1290,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
+ // non-final for mocking
boolean startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator it = Iterables.concat(pendingFinishCommits, pendingCommits,
pendingTransactions).iterator();
@@ -1231,7 +1319,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return false;
}
- DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree);
+ DataTreeTip newTip = requireNonNullElse(first.cohort.getCandidate(), dataTree);
while (it.hasNext()) {
final CommitEntry e = it.next();
if (cohort.equals(e.cohort)) {
@@ -1244,7 +1332,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return true;
} else {
- newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), newTip);
+ newTip = requireNonNullElse(e.cohort.getCandidate(), newTip);
}
}
@@ -1283,7 +1371,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
- void setRunOnPendingTransactionsComplete(final Runnable operation) {
+ final void setRunOnPendingTransactionsComplete(final Runnable operation) {
runOnPendingTransactionsComplete = operation;
maybeRunOperationOnPendingTransactionsComplete();
}
@@ -1298,16 +1386,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
- ShardStats getStats() {
+ final ShardStats getStats() {
return shard.getShardMBean();
}
- Iterator cohortIterator() {
+ final Iterator cohortIterator() {
return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
e -> e.cohort).iterator();
}
- void removeTransactionChain(final LocalHistoryIdentifier id) {
+ final void removeTransactionChain(final LocalHistoryIdentifier id) {
if (transactionChains.remove(id) != null) {
LOG.debug("{}: Removed transaction chain {}", logContext, id);
}