X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=3665fad559584db6eab228aecdb7c9920df9f5d3;hb=refs%2Fchanges%2F16%2F87616%2F4;hp=016b078d5c002f1154aa6908920f4f27c65a2c52;hpb=ce9776465d3be8daabd43a9fae20406b5443db54;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 016b078d5c..3665fad559 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -57,11 +57,14 @@ import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifia import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload; +import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion; import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -83,6 +86,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -294,15 +298,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws DataValidationFailedException when the snapshot fails to apply */ void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation applySnapshot(snapshot, UnaryOperator.identity()); } - private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) { - return new PruningDataTreeModification(delegate, dataTree, - // TODO: we should be able to reuse the pruner, provided we are not reentrant - ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext)); - } - /** * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data * pruning in an attempt to adjust the state to our current SchemaContext. @@ -310,18 +309,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoverySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { - applySnapshot(snapshot, this::wrapWithPruning); + void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException { + // TODO: we should be able to reuse the pruner, provided we are not reentrant + final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext( + dataSchemaContext); + if (snapshot.needsMigration()) { + final ReusableNormalizedNodePruner uintPruner = pruner.withUintAdaption(); + applySnapshot(snapshot.getSnapshot(), + delegate -> new PruningDataTreeModification.Proactive(delegate, dataTree, uintPruner)); + } else { + applySnapshot(snapshot.getSnapshot(), + delegate -> new PruningDataTreeModification.Reactive(delegate, dataTree, pruner)); + } } @SuppressWarnings("checkstyle:IllegalCatch") private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException { - final Entry entry = payload.getCandidate(); + final Entry entry = payload.acquireCandidate(); final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification(); - final PruningDataTreeModification mod = wrapWithPruning(unwrapped); - DataTreeCandidates.applyToModification(mod, entry.getValue()); - mod.ready(); + final PruningDataTreeModification mod = createPruningModification(unwrapped, + NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0); + DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate()); + mod.ready(); LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped); try { @@ -339,6 +349,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { allMetadataCommittedTransaction(entry.getKey()); } + private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped, + final boolean uintAdapting) { + // TODO: we should be able to reuse the pruner, provided we are not reentrant + final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext( + dataSchemaContext); + return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption()) + : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner); + } + /** * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data * pruning in an attempt to adjust the state to our current SchemaContext. @@ -367,12 +386,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private void applyReplicatedCandidate(final CommitTransactionPayload payload) throws DataValidationFailedException, IOException { - final Entry entry = payload.getCandidate(); + final Entry entry = payload.acquireCandidate(); final TransactionIdentifier identifier = entry.getKey(); LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); final DataTreeModification mod = dataTree.takeSnapshot().newModification(); - DataTreeCandidates.applyToModification(mod, entry.getValue()); + // TODO: check version here, which will enable us to perform forward-compatibility transformations + DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate()); mod.ready(); LOG.trace("{}: Applying foreign modification {}", logContext, mod); @@ -411,7 +431,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { applyReplicatedCandidate((CommitTransactionPayload) payload); } else { verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + // if we did not track this transaction before, it means that it came from another leader and we are in + // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to + // the local DataTree and would be lost if it was only applied via payloadReplicationComplete(). + if (!payloadReplicationComplete((TransactionIdentifier) identifier)) { + applyReplicatedCandidate((CommitTransactionPayload) payload); + } } } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { @@ -460,22 +485,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void payloadReplicationComplete(final TransactionIdentifier txId) { + private boolean payloadReplicationComplete(final TransactionIdentifier txId) { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); allMetadataCommittedTransaction(txId); - return; + return false; } if (!current.cohort.getIdentifier().equals(txId)) { LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); allMetadataCommittedTransaction(txId); - return; + return false; } finishCommit(current.cohort); + return true; } private void allMetadataAbortedTransaction(final TransactionIdentifier txId) { @@ -1024,7 +1050,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final TransactionIdentifier txId = cohort.getIdentifier(); final Payload payload; try { - payload = CommitTransactionPayload.create(txId, candidate, + payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(), shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()); } catch (IOException e) { LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);