*/
package org.opendaylight.controller.cluster.datastore;
+import static akka.actor.ActorRef.noSender;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.node.utils.transformer.ReusableNormalizedNodePruner;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload;
import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private int currentTransactionBatch;
- ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree,
+ ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final DataTree dataTree,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final String logContext,
final ShardDataTreeMetadata<?>... metadata) {
tip = dataTree;
}
- ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+ ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType,
final YangInstanceIdentifier root,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final String logContext,
}
@VisibleForTesting
- public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
+ public ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType) {
this(shard, schemaContext, treeType, YangInstanceIdentifier.empty(),
new DefaultShardDataTreeChangeListenerPublisher(""), "");
}
return schemaContext;
}
- void updateSchemaContext(final SchemaContext newSchemaContext) {
- dataTree.setSchemaContext(newSchemaContext);
- this.schemaContext = requireNonNull(newSchemaContext);
+ void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
+ dataTree.setEffectiveModelContext(newSchemaContext);
+ this.schemaContext = newSchemaContext;
this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
* @throws DataValidationFailedException when the snapshot fails to apply
*/
void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation
applySnapshot(snapshot, UnaryOperator.identity());
}
- private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
- return new PruningDataTreeModification(delegate, dataTree,
- // TODO: we should be able to reuse the pruner, provided we are not reentrant
- ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext));
- }
-
/**
* Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data
* pruning in an attempt to adjust the state to our current SchemaContext.
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoverySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
- applySnapshot(snapshot, this::wrapWithPruning);
+ void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException {
+ // TODO: we should be able to reuse the pruner, provided we are not reentrant
+ final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
+ dataSchemaContext);
+ if (snapshot.needsMigration()) {
+ final ReusableNormalizedNodePruner uintPruner = pruner.withUintAdaption();
+ applySnapshot(snapshot.getSnapshot(),
+ delegate -> new PruningDataTreeModification.Proactive(delegate, dataTree, uintPruner));
+ } else {
+ applySnapshot(snapshot.getSnapshot(),
+ delegate -> new PruningDataTreeModification.Reactive(delegate, dataTree, pruner));
+ }
}
@SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.getCandidate();
+ final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
- // FIXME: CONTROLLER-1923: examine version first
- final PruningDataTreeModification mod = wrapWithPruning(unwrapped);
+ final PruningDataTreeModification mod = createPruningModification(unwrapped,
+ NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0);
+
DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
mod.ready();
-
LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
try {
allMetadataCommittedTransaction(entry.getKey());
}
+ private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped,
+ final boolean uintAdapting) {
+ // TODO: we should be able to reuse the pruner, provided we are not reentrant
+ final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
+ dataSchemaContext);
+ return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption())
+ : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner);
+ }
+
/**
* Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data
* pruning in an attempt to adjust the state to our current SchemaContext.
private void applyReplicatedCandidate(final CommitTransactionPayload payload)
throws DataValidationFailedException, IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.getCandidate();
+ final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
final TransactionIdentifier identifier = entry.getKey();
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
applyReplicatedCandidate((CommitTransactionPayload) payload);
} else {
verify(identifier instanceof TransactionIdentifier);
- payloadReplicationComplete((TransactionIdentifier) identifier);
+ // if we did not track this transaction before, it means that it came from another leader and we are in
+ // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to
+ // the local DataTree and would be lost if it was only applied via payloadReplicationComplete().
+ if (!payloadReplicationComplete((TransactionIdentifier) identifier)) {
+ applyReplicatedCandidate((CommitTransactionPayload) payload);
+ }
}
+
+ // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed.
+ checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue()
+ .getCandidate());
} else if (payload instanceof AbortTransactionPayload) {
if (identifier != null) {
payloadReplicationComplete((AbortTransactionPayload) payload);
}
}
+ private void checkRootOverwrite(DataTreeCandidate candidate) {
+ final DatastoreContext datastoreContext = shard.getDatastoreContext();
+ if (!datastoreContext.isSnapshotOnRootOverwrite()) {
+ return;
+ }
+
+ if (!datastoreContext.isPersistent()) {
+ return;
+ }
+
+ if (candidate.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) {
+ return;
+ }
+
+ // top level container ie "/"
+ if ((candidate.getRootPath().equals(YangInstanceIdentifier.empty())
+ && candidate.getRootNode().getModificationType().equals(ModificationType.WRITE))) {
+ LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext);
+ shard.self().tell(new InitiateCaptureSnapshot(), noSender());
+ return;
+ }
+ }
+
private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) {
if (callback != null) {
replicationCallbacks.put(payload, callback);
}
}
- private void payloadReplicationComplete(final TransactionIdentifier txId) {
+ private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
final CommitEntry current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
allMetadataCommittedTransaction(txId);
- return;
+ return false;
}
if (!current.cohort.getIdentifier().equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
current.cohort.getIdentifier(), txId);
allMetadataCommittedTransaction(txId);
- return;
+ return false;
}
finishCommit(current.cohort);
+ return true;
}
private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
final TransactionIdentifier txId = cohort.getIdentifier();
final Payload payload;
try {
- payload = CommitTransactionPayload.create(txId, candidate,
+ payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(),
shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
} catch (IOException e) {
LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);