X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=d908c7fc620c7ec58fc7c23bfa6d98ab0d1c3896;hb=abeaf223cadd818e2054b516e39c20305ea144b8;hp=c37703c3359b503bf1e65cab8e46d49fdd9b5353;hpb=634dfac8eead60f443bf75e749c70d1f2bb29198;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
index c37703c335..d908c7fc62 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
@@ -7,14 +7,17 @@
*/
package org.opendaylight.controller.cluster.datastore;
+import static akka.actor.ActorRef.noSender;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+import static java.util.Objects.requireNonNullElse;
+
import akka.actor.ActorRef;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
@@ -33,6 +36,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Queue;
import java.util.SortedSet;
import java.util.concurrent.TimeUnit;
@@ -40,26 +45,32 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.node.utils.transformer.ReusableNormalizedNodePruner;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload;
import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
@@ -78,29 +89,31 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
/**
- * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
- * e.g. it does not expose public interfaces and assumes it is only ever called from a
- * single thread.
+ * Internal shard state, similar to a DOMStore, but optimized for use in the actor system, e.g. it does not expose
+ * public interfaces and assumes it is only ever called from a single thread.
*
*
- * This class is not part of the API contract and is subject to change at any time.
+ * This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe.
*/
-@NotThreadSafe
public class ShardDataTree extends ShardDataTreeTransactionParent {
private static final class CommitEntry {
final SimpleShardDataTreeCohort cohort;
long lastAccess;
CommitEntry(final SimpleShardDataTreeCohort cohort, final long now) {
- this.cohort = Preconditions.checkNotNull(cohort);
+ this.cohort = requireNonNull(cohort);
lastAccess = now;
}
@@ -110,7 +123,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
- private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+ private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(FiniteDuration.create(5, TimeUnit.SECONDS));
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
/**
@@ -148,24 +161,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
private DataTreeTip tip;
private SchemaContext schemaContext;
+ private DataSchemaContextTree dataSchemaContext;
private int currentTransactionBatch;
- ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree,
+ ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final DataTree dataTree,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final String logContext,
final ShardDataTreeMetadata>... metadata) {
- this.dataTree = Preconditions.checkNotNull(dataTree);
+ this.dataTree = requireNonNull(dataTree);
updateSchemaContext(schemaContext);
- this.shard = Preconditions.checkNotNull(shard);
- this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
- this.logContext = Preconditions.checkNotNull(logContext);
+ this.shard = requireNonNull(shard);
+ this.treeChangeListenerPublisher = requireNonNull(treeChangeListenerPublisher);
+ this.logContext = requireNonNull(logContext);
this.metadata = ImmutableList.copyOf(metadata);
tip = dataTree;
}
- ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+ ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType,
final YangInstanceIdentifier root,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final String logContext,
@@ -183,8 +197,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
@VisibleForTesting
- public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
- this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
+ public ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType) {
+ this(shard, schemaContext, treeType, YangInstanceIdentifier.empty(),
new DefaultShardDataTreeChangeListenerPublisher(""), "");
}
@@ -204,9 +218,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return schemaContext;
}
- void updateSchemaContext(final SchemaContext newSchemaContext) {
- dataTree.setSchemaContext(newSchemaContext);
- this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
+ void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
+ dataTree.setEffectiveModelContext(newSchemaContext);
+ schemaContext = newSchemaContext;
+ dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
void resetTransactionBatch() {
@@ -218,8 +233,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
*
* @return A state snapshot
*/
- @Nonnull ShardDataTreeSnapshot takeStateSnapshot() {
- final NormalizedNode, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get();
+ @NonNull ShardDataTreeSnapshot takeStateSnapshot() {
+ final NormalizedNode, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
final Builder>, ShardDataTreeSnapshotMetadata>> metaBuilder =
ImmutableMap.builder();
@@ -237,7 +252,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return !pendingTransactions.isEmpty() || !pendingCommits.isEmpty() || !pendingFinishCommits.isEmpty();
}
- private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot,
+ private void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot,
final UnaryOperator wrapper) throws DataValidationFailedException {
final Stopwatch elapsed = Stopwatch.createStarted();
@@ -261,18 +276,18 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
- final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification());
+ final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
+ final DataTreeModification mod = wrapper.apply(unwrapped);
// delete everything first
- mod.delete(YangInstanceIdentifier.EMPTY);
+ mod.delete(YangInstanceIdentifier.empty());
- final java.util.Optional> maybeNode = snapshot.getRootNode();
+ final Optional> maybeNode = snapshot.getRootNode();
if (maybeNode.isPresent()) {
// Add everything from the remote node back
- mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get());
+ mod.write(YangInstanceIdentifier.empty(), maybeNode.get());
}
mod.ready();
- final DataTreeModification unwrapped = unwrap(mod);
dataTree.validate(unwrapped);
DataTreeCandidateTip candidate = dataTree.prepare(unwrapped);
dataTree.commit(candidate);
@@ -288,21 +303,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation
applySnapshot(snapshot, UnaryOperator.identity());
}
- private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
- return new PruningDataTreeModification(delegate, dataTree, schemaContext);
- }
-
- private static DataTreeModification unwrap(final DataTreeModification modification) {
- if (modification instanceof PruningDataTreeModification) {
- return ((PruningDataTreeModification)modification).delegate();
- }
- return modification;
- }
-
/**
* Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data
* pruning in an attempt to adjust the state to our current SchemaContext.
@@ -310,17 +315,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoverySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
- applySnapshot(snapshot, this::wrapWithPruning);
+ void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException {
+ // TODO: we should be able to reuse the pruner, provided we are not reentrant
+ final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
+ dataSchemaContext);
+ if (snapshot.needsMigration()) {
+ final ReusableNormalizedNodePruner uintPruner = pruner.withUintAdaption();
+ applySnapshot(snapshot.getSnapshot(),
+ delegate -> new PruningDataTreeModification.Proactive(delegate, dataTree, uintPruner));
+ } else {
+ applySnapshot(snapshot.getSnapshot(),
+ delegate -> new PruningDataTreeModification.Reactive(delegate, dataTree, pruner));
+ }
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void applyRecoveryCandidate(final DataTreeCandidate candidate) {
- final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
- DataTreeCandidates.applyToModification(mod, candidate);
- mod.ready();
+ private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
+ final Entry entry = payload.acquireCandidate();
+ final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
+ final PruningDataTreeModification mod = createPruningModification(unwrapped,
+ NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0);
- final DataTreeModification unwrapped = mod.delegate();
+ DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
+ mod.ready();
LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
try {
@@ -334,6 +351,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
"%s: Failed to apply recovery payload. Modification data was written to file %s",
logContext, file), e);
}
+
+ allMetadataCommittedTransaction(entry.getKey());
+ }
+
+ private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped,
+ final boolean uintAdapting) {
+ // TODO: we should be able to reuse the pruner, provided we are not reentrant
+ final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
+ dataSchemaContext);
+ return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption())
+ : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner);
}
/**
@@ -344,12 +372,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException {
+ void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
if (payload instanceof CommitTransactionPayload) {
- final Entry e =
- ((CommitTransactionPayload) payload).getCandidate();
- applyRecoveryCandidate(e.getValue());
- allMetadataCommittedTransaction(e.getKey());
+ applyRecoveryCandidate((CommitTransactionPayload) payload);
} else if (payload instanceof AbortTransactionPayload) {
allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
} else if (payload instanceof PurgeTransactionPayload) {
@@ -360,17 +385,22 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
} else if (payload instanceof PurgeLocalHistoryPayload) {
allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload) {
+ allMetadataSkipTransactions((SkipTransactionsPayload) payload);
} else {
LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
}
}
- private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
- throws DataValidationFailedException {
+ private void applyReplicatedCandidate(final CommitTransactionPayload payload)
+ throws DataValidationFailedException, IOException {
+ final Entry entry = payload.acquireCandidate();
+ final TransactionIdentifier identifier = entry.getKey();
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
final DataTreeModification mod = dataTree.takeSnapshot().newModification();
- DataTreeCandidates.applyToModification(mod, foreign);
+ // TODO: check version here, which will enable us to perform forward-compatibility transformations
+ DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
mod.ready();
LOG.trace("{}: Applying foreign modification {}", logContext, mod);
@@ -378,6 +408,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final DataTreeCandidate candidate = dataTree.prepare(mod);
dataTree.commit(candidate);
+ allMetadataCommittedTransaction(identifier);
notifyListeners(candidate);
}
@@ -404,18 +435,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* pre-Boron state -- which limits the number of options here.
*/
if (payload instanceof CommitTransactionPayload) {
- final TransactionIdentifier txId;
if (identifier == null) {
- final Entry e =
- ((CommitTransactionPayload) payload).getCandidate();
- txId = e.getKey();
- applyReplicatedCandidate(txId, e.getValue());
+ applyReplicatedCandidate((CommitTransactionPayload) payload);
} else {
- Verify.verify(identifier instanceof TransactionIdentifier);
- txId = (TransactionIdentifier) identifier;
- payloadReplicationComplete(txId);
+ verify(identifier instanceof TransactionIdentifier);
+ // if we did not track this transaction before, it means that it came from another leader and we are in
+ // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to
+ // the local DataTree and would be lost if it was only applied via payloadReplicationComplete().
+ if (!payloadReplicationComplete((TransactionIdentifier) identifier)) {
+ applyReplicatedCandidate((CommitTransactionPayload) payload);
+ }
}
- allMetadataCommittedTransaction(txId);
+
+ // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed.
+ checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue()
+ .getCandidate());
} else if (payload instanceof AbortTransactionPayload) {
if (identifier != null) {
payloadReplicationComplete((AbortTransactionPayload) payload);
@@ -441,12 +475,36 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
}
allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((SkipTransactionsPayload)payload);
+ }
+ allMetadataSkipTransactions((SkipTransactionsPayload) payload);
} else {
LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
}
}
- private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) {
+ private void checkRootOverwrite(final DataTreeCandidate candidate) {
+ final DatastoreContext datastoreContext = shard.getDatastoreContext();
+ if (!datastoreContext.isSnapshotOnRootOverwrite()) {
+ return;
+ }
+
+ if (!datastoreContext.isPersistent()) {
+ // FIXME: why don't we want a snapshot in non-persistent state?
+ return;
+ }
+
+ // top level container ie "/"
+ if (candidate.getRootPath().isEmpty()
+ && candidate.getRootNode().getModificationType() == ModificationType.WRITE) {
+ LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext);
+ shard.self().tell(new InitiateCaptureSnapshot(), noSender());
+ }
+ }
+
+ private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) {
if (callback != null) {
replicationCallbacks.put(payload, callback);
}
@@ -463,20 +521,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
- private void payloadReplicationComplete(final TransactionIdentifier txId) {
+ private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
final CommitEntry current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
- return;
+ allMetadataCommittedTransaction(txId);
+ return false;
}
if (!current.cohort.getIdentifier().equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
current.cohort.getIdentifier(), txId);
- return;
+ allMetadataCommittedTransaction(txId);
+ return false;
}
finishCommit(current.cohort);
+ return true;
}
private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
@@ -515,6 +576,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
+ private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) {
+ final var historyId = payload.getIdentifier();
+ final var txIds = payload.getTransactionIds();
+ for (ShardDataTreeMetadata> m : metadata) {
+ m.onTransactionsSkipped(historyId, txIds);
+ }
+ }
+
/**
* Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
* this method is used for re-establishing state when we are taking over
@@ -527,18 +596,18 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final boolean closed) {
final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
- Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId,
- existing);
+ checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, existing);
return ret;
}
ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
- @Nullable final Runnable callback) {
+ final @Nullable Runnable callback) {
ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
chain = new ShardDataTreeTransactionChain(historyId, this);
transactionChains.put(historyId, chain);
- replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+ replicatePayload(historyId, CreateLocalHistoryPayload.create(
+ historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
} else if (callback != null) {
callback.run();
}
@@ -547,6 +616,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+ shard.getShardMBean().incrementReadOnlyTransactionCount();
+
if (txId.getHistoryId().getHistoryId() == 0) {
return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
}
@@ -555,6 +626,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+ shard.getShardMBean().incrementReadWriteTransactionCount();
+
if (txId.getHistoryId().getHistoryId() == 0) {
return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot()
.newModification());
@@ -587,18 +660,34 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
- void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+ void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
+ if (commonCloseTransactionChain(id, callback)) {
+ replicatePayload(id, CloseLocalHistoryPayload.create(id,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+ }
+ }
+
+ /**
+ * Close a single transaction chain which is received through ask-based protocol. It does not keep a commit record.
+ *
+ * @param id History identifier
+ */
+ void closeTransactionChain(final LocalHistoryIdentifier id) {
+ commonCloseTransactionChain(id, null);
+ }
+
+ private boolean commonCloseTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
final ShardDataTreeTransactionChain chain = transactionChains.get(id);
if (chain == null) {
LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id);
if (callback != null) {
callback.run();
}
- return;
+ return false;
}
chain.close();
- replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+ return true;
}
/**
@@ -607,7 +696,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
- void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+ void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
if (chain == null) {
LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
@@ -617,14 +706,28 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return;
}
- replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
+ replicatePayload(id, PurgeLocalHistoryPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
- Optional readCurrentData() {
- final java.util.Optional> currentState =
- dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
- return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
- YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent();
+ final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds,
+ final Runnable callback) {
+ final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+ if (chain == null) {
+ LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id);
+ if (callback != null) {
+ callback.run();
+ }
+ return;
+ }
+
+ replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+ }
+
+ final Optional readCurrentData() {
+ return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
+ .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
}
public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
@@ -641,7 +744,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
void abortTransaction(final AbstractShardDataTreeTransaction> transaction, final Runnable callback) {
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
- replicatePayload(id, AbortTransactionPayload.create(id), callback);
+ replicatePayload(id, AbortTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
@Override
@@ -652,20 +756,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@Override
ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
- final java.util.Optional> participatingShardNames) {
+ final Optional> participatingShardNames) {
final DataTreeModification snapshot = transaction.getSnapshot();
+ final TransactionIdentifier id = transaction.getIdentifier();
+ LOG.debug("{}: readying transaction {}", logContext, id);
snapshot.ready();
+ LOG.debug("{}: transaction {} ready", logContext, id);
return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
}
void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
LOG.debug("{}: purging transaction {}", logContext, id);
- replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+ replicatePayload(id, PurgeTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
public Optional> readNode(final YangInstanceIdentifier path) {
- return Optional.fromJavaUtil(dataTree.takeSnapshot().readNode(path));
+ return dataTree.takeSnapshot().readNode(path);
}
DataTreeSnapshot takeSnapshot() {
@@ -738,8 +846,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", logContext, cohort.getIdentifier(),
- modification, dataTree);
+ LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.getIdentifier(), modification);
+ LOG.trace("{}: Current tree: {}", logContext, dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
} catch (Exception e) {
LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
@@ -873,7 +981,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
processNextPendingTransaction();
}
- private void insertEntry(final Deque queue, final CommitEntry entry, final int atIndex) {
+ private static void insertEntry(final Deque queue, final CommitEntry entry, final int atIndex) {
if (atIndex == 0) {
queue.addFirst(entry);
return;
@@ -891,8 +999,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
tempStack.forEach(queue::addFirst);
}
- private Collection extractPrecedingShardNames(
- final java.util.Optional> participatingShardNames) {
+ private Collection extractPrecedingShardNames(final Optional> participatingShardNames) {
return participatingShardNames.map((Function, Collection>)
set -> set.headSet(shard.getShardName())).orElse(Collections.emptyList());
}
@@ -906,17 +1013,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
- Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
+ checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
final SimpleShardDataTreeCohort current = entry.cohort;
- Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+ verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
- LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier());
+ final TransactionIdentifier currentId = current.getIdentifier();
+ LOG.debug("{}: Preparing transaction {}", logContext, currentId);
final DataTreeCandidateTip candidate;
try {
candidate = tip.prepare(cohort.getDataTreeModification());
- } catch (RuntimeException e) {
+ LOG.debug("{}: Transaction {} candidate ready", logContext, currentId);
+ } catch (DataValidationFailedException | RuntimeException e) {
failPreCommit(e);
return;
}
@@ -925,14 +1034,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@Override
public void onSuccess(final Void noop) {
// Set the tip of the data tree.
- tip = Verify.verifyNotNull(candidate);
+ tip = verifyNotNull(candidate);
entry.lastAccess = readTime();
pendingTransactions.remove();
pendingCommits.add(entry);
- LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+ LOG.debug("{}: Transaction {} prepared", logContext, currentId);
cohort.successfulPreCommit(candidate);
@@ -972,6 +1081,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return;
}
+ allMetadataCommittedTransaction(txId);
shard.getShardMBean().incrementCommittedTransactionCount();
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
@@ -986,7 +1096,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
final CommitEntry entry = pendingCommits.peek();
- Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
+ checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
final SimpleShardDataTreeCohort current = entry.cohort;
if (!cohort.equals(current)) {
@@ -999,7 +1109,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final TransactionIdentifier txId = cohort.getIdentifier();
final Payload payload;
try {
- payload = CommitTransactionPayload.create(txId, candidate);
+ payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(),
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
} catch (IOException e) {
LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
pendingCommits.poll().cohort.failedCommit(e);
@@ -1055,7 +1166,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
- final java.util.Optional> participatingShardNames) {
+ final Optional> participatingShardNames) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
COMMIT_STEP_TIMEOUT), participatingShardNames);
@@ -1066,7 +1177,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
// Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
// the newReadWriteTransaction()
ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
- final java.util.Optional> participatingShardNames) {
+ final Optional> participatingShardNames) {
if (txId.getHistoryId().getHistoryId() == 0) {
return createReadyCohort(txId, mod, participatingShardNames);
}
@@ -1076,7 +1187,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
- final Function> accessTimeUpdater) {
+ final Function accessTimeUpdater) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = readTime();
@@ -1094,9 +1205,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return;
}
- final Optional updateOpt = accessTimeUpdater.apply(currentTx.cohort);
+ final OptionalLong updateOpt = accessTimeUpdater.apply(currentTx.cohort);
if (updateOpt.isPresent()) {
- final long newAccess = updateOpt.get().longValue();
+ final long newAccess = updateOpt.getAsLong();
final long newDelta = now - newAccess;
if (newDelta < delta) {
LOG.debug("{}: Updated current transaction {} access time", logContext,
@@ -1202,7 +1313,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return false;
}
- DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree);
+ DataTreeTip newTip = requireNonNullElse(first.cohort.getCandidate(), dataTree);
while (it.hasNext()) {
final CommitEntry e = it.next();
if (cohort.equals(e.cohort)) {
@@ -1215,7 +1326,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return true;
} else {
- newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), newTip);
+ newTip = requireNonNullElse(e.cohort.getCandidate(), newTip);
}
}
@@ -1224,8 +1335,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void rebaseTransactions(final Iterator iter, @Nonnull final DataTreeTip newTip) {
- tip = Preconditions.checkNotNull(newTip);
+ private void rebaseTransactions(final Iterator iter, final @NonNull DataTreeTip newTip) {
+ tip = requireNonNull(newTip);
while (iter.hasNext()) {
final SimpleShardDataTreeCohort cohort = iter.next().cohort;
if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {