X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=3665fad559584db6eab228aecdb7c9920df9f5d3;hb=refs%2Fchanges%2F16%2F87616%2F4;hp=a097b6808152aaa7b21a0abf31626d8815cf6b11;hpb=04f22714755ee8e52a63386cc1f4290402659838;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index a097b68081..3665fad559 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -311,32 +311,27 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { */ void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException { // TODO: we should be able to reuse the pruner, provided we are not reentrant - ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext); + final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext( + dataSchemaContext); if (snapshot.needsMigration()) { - pruner = pruner.withUintAdaption(); + final ReusableNormalizedNodePruner uintPruner = pruner.withUintAdaption(); + applySnapshot(snapshot.getSnapshot(), + delegate -> new PruningDataTreeModification.Proactive(delegate, dataTree, uintPruner)); + } else { + applySnapshot(snapshot.getSnapshot(), + delegate -> new PruningDataTreeModification.Reactive(delegate, dataTree, pruner)); } - - // For lambda below - final ReusableNormalizedNodePruner finalPruner = pruner; - applySnapshot(snapshot.getSnapshot(), - delegate -> new PruningDataTreeModification(delegate, dataTree, finalPruner)); } @SuppressWarnings("checkstyle:IllegalCatch") private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException { - final Entry entry = payload.getCandidate(); + final Entry entry = payload.acquireCandidate(); final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification(); + final PruningDataTreeModification mod = createPruningModification(unwrapped, + NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0); - // TODO: we should be able to reuse the pruner, provided we are not reentrant - ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext); - if (NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0) { - pruner = pruner.withUintAdaption(); - } - - final PruningDataTreeModification mod = new PruningDataTreeModification(unwrapped, dataTree, pruner); DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate()); mod.ready(); - LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped); try { @@ -354,6 +349,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { allMetadataCommittedTransaction(entry.getKey()); } + private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped, + final boolean uintAdapting) { + // TODO: we should be able to reuse the pruner, provided we are not reentrant + final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext( + dataSchemaContext); + return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption()) + : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner); + } + /** * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data * pruning in an attempt to adjust the state to our current SchemaContext. @@ -382,7 +386,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private void applyReplicatedCandidate(final CommitTransactionPayload payload) throws DataValidationFailedException, IOException { - final Entry entry = payload.getCandidate(); + final Entry entry = payload.acquireCandidate(); final TransactionIdentifier identifier = entry.getKey(); LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); @@ -427,7 +431,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { applyReplicatedCandidate((CommitTransactionPayload) payload); } else { verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier) identifier); + // if we did not track this transaction before, it means that it came from another leader and we are in + // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to + // the local DataTree and would be lost if it was only applied via payloadReplicationComplete(). + if (!payloadReplicationComplete((TransactionIdentifier) identifier)) { + applyReplicatedCandidate((CommitTransactionPayload) payload); + } } } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { @@ -476,22 +485,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void payloadReplicationComplete(final TransactionIdentifier txId) { + private boolean payloadReplicationComplete(final TransactionIdentifier txId) { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); allMetadataCommittedTransaction(txId); - return; + return false; } if (!current.cohort.getIdentifier().equals(txId)) { LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); allMetadataCommittedTransaction(txId); - return; + return false; } finishCommit(current.cohort); + return true; } private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {