X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=bd5d7360b268fdd24974b4d716a73743d61093d3;hp=614a346068c6163442420fda028992b170977c83;hb=abaef4a5ae37f27542155457fe7306a4662b1eeb;hpb=67c6623cebe8f1d735df90eccc85fd223e6c021f diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 614a346068..bd5d7360b2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -7,15 +7,16 @@ */ package org.opendaylight.controller.cluster.datastore; +import static akka.actor.ActorRef.noSender; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; import akka.actor.ActorRef; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -50,19 +51,23 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.node.utils.transformer.ReusableNormalizedNodePruner; import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload; +import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion; import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; @@ -81,9 +86,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,7 +162,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private int currentTransactionBatch; - ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree, + ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final DataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { @@ -168,7 +176,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { tip = dataTree; } - ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, + ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType, final YangInstanceIdentifier root, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final String logContext, @@ -186,7 +194,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @VisibleForTesting - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { + public ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.empty(), new DefaultShardDataTreeChangeListenerPublisher(""), ""); } @@ -207,9 +215,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return schemaContext; } - void updateSchemaContext(final SchemaContext newSchemaContext) { - dataTree.setSchemaContext(newSchemaContext); - this.schemaContext = requireNonNull(newSchemaContext); + void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) { + dataTree.setEffectiveModelContext(newSchemaContext); + this.schemaContext = newSchemaContext; this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); } @@ -223,7 +231,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @return A state snapshot */ @NonNull ShardDataTreeSnapshot takeStateSnapshot() { - final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get(); + final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get(); final Builder>, ShardDataTreeSnapshotMetadata> metaBuilder = ImmutableMap.builder(); @@ -265,18 +273,18 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification()); + final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification(); + final DataTreeModification mod = wrapper.apply(unwrapped); // delete everything first mod.delete(YangInstanceIdentifier.empty()); - final Optional> maybeNode = snapshot.getRootNode(); + final Optional maybeNode = snapshot.getRootNode(); if (maybeNode.isPresent()) { // Add everything from the remote node back mod.write(YangInstanceIdentifier.empty(), maybeNode.get()); } mod.ready(); - final DataTreeModification unwrapped = unwrap(mod); dataTree.validate(unwrapped); DataTreeCandidateTip candidate = dataTree.prepare(unwrapped); dataTree.commit(candidate); @@ -293,20 +301,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws DataValidationFailedException when the snapshot fails to apply */ void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation applySnapshot(snapshot, UnaryOperator.identity()); } - private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) { - return new PruningDataTreeModification(delegate, dataTree, dataSchemaContext); - } - - private static DataTreeModification unwrap(final DataTreeModification modification) { - if (modification instanceof PruningDataTreeModification) { - return ((PruningDataTreeModification)modification).delegate(); - } - return modification; - } - /** * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data * pruning in an attempt to adjust the state to our current SchemaContext. @@ -314,19 +312,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoverySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { - applySnapshot(snapshot, this::wrapWithPruning); + void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException { + // TODO: we should be able to reuse the pruner, provided we are not reentrant + final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext( + dataSchemaContext); + if (snapshot.needsMigration()) { + final ReusableNormalizedNodePruner uintPruner = pruner.withUintAdaption(); + applySnapshot(snapshot.getSnapshot(), + delegate -> new PruningDataTreeModification.Proactive(delegate, dataTree, uintPruner)); + } else { + applySnapshot(snapshot.getSnapshot(), + delegate -> new PruningDataTreeModification.Reactive(delegate, dataTree, pruner)); + } } @SuppressWarnings("checkstyle:IllegalCatch") private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException { - final Entry entry = payload.getCandidate(); + final Entry entry = payload.acquireCandidate(); + final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification(); + final PruningDataTreeModification mod = createPruningModification(unwrapped, + NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0); - final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification()); - DataTreeCandidates.applyToModification(mod, entry.getValue()); + DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate()); mod.ready(); - - final DataTreeModification unwrapped = mod.delegate(); LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped); try { @@ -344,6 +352,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { allMetadataCommittedTransaction(entry.getKey()); } + private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped, + final boolean uintAdapting) { + // TODO: we should be able to reuse the pruner, provided we are not reentrant + final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext( + dataSchemaContext); + return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption()) + : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner); + } + /** * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data * pruning in an attempt to adjust the state to our current SchemaContext. @@ -372,12 +389,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private void applyReplicatedCandidate(final CommitTransactionPayload payload) throws DataValidationFailedException, IOException { - final Entry entry = payload.getCandidate(); + final Entry entry = payload.acquireCandidate(); final TransactionIdentifier identifier = entry.getKey(); LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); final DataTreeModification mod = dataTree.takeSnapshot().newModification(); - DataTreeCandidates.applyToModification(mod, entry.getValue()); + // TODO: check version here, which will enable us to perform forward-compatibility transformations + DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate()); mod.ready(); LOG.trace("{}: Applying foreign modification {}", logContext, mod); @@ -416,8 +434,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { applyReplicatedCandidate((CommitTransactionPayload) payload); } else { verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + // if we did not track this transaction before, it means that it came from another leader and we are in + // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to + // the local DataTree and would be lost if it was only applied via payloadReplicationComplete(). + if (!payloadReplicationComplete((TransactionIdentifier) identifier)) { + applyReplicatedCandidate((CommitTransactionPayload) payload); + } } + + // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed. + checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue() + .getCandidate()); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); @@ -448,6 +475,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + private void checkRootOverwrite(final DataTreeCandidate candidate) { + final DatastoreContext datastoreContext = shard.getDatastoreContext(); + if (!datastoreContext.isSnapshotOnRootOverwrite()) { + return; + } + + if (!datastoreContext.isPersistent()) { + // FIXME: why don't we want a snapshot in non-persistent state? + return; + } + + // top level container ie "/" + if (candidate.getRootPath().isEmpty() + && candidate.getRootNode().getModificationType() == ModificationType.WRITE) { + LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext); + shard.self().tell(new InitiateCaptureSnapshot(), noSender()); + } + } + private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) { if (callback != null) { replicationCallbacks.put(payload, callback); @@ -465,22 +511,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void payloadReplicationComplete(final TransactionIdentifier txId) { + private boolean payloadReplicationComplete(final TransactionIdentifier txId) { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); allMetadataCommittedTransaction(txId); - return; + return false; } if (!current.cohort.getIdentifier().equals(txId)) { LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); allMetadataCommittedTransaction(txId); - return; + return false; } finishCommit(current.cohort); + return true; } private void allMetadataAbortedTransaction(final TransactionIdentifier txId) { @@ -692,7 +739,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } - public Optional> readNode(final YangInstanceIdentifier path) { + public Optional readNode(final YangInstanceIdentifier path) { return dataTree.takeSnapshot().readNode(path); } @@ -945,7 +992,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { try { candidate = tip.prepare(cohort.getDataTreeModification()); LOG.debug("{}: Transaction {} candidate ready", logContext, currentId); - } catch (RuntimeException e) { + } catch (DataValidationFailedException | RuntimeException e) { failPreCommit(e); return; } @@ -1029,7 +1076,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final TransactionIdentifier txId = cohort.getIdentifier(); final Payload payload; try { - payload = CommitTransactionPayload.create(txId, candidate, + payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(), shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()); } catch (IOException e) { LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e); @@ -1233,7 +1280,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return false; } - DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree); + DataTreeTip newTip = requireNonNullElse(first.cohort.getCandidate(), dataTree); while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { @@ -1246,7 +1293,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return true; } else { - newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), newTip); + newTip = requireNonNullElse(e.cohort.getCandidate(), newTip); } }