X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=43a3f6dc49cb201ea34c167a6d57c5794740b6f8;hb=55a9b9f42a14c56060f74b38f84d444c0fbfecc4;hp=5c94a8e36161bdd7fc1f9180bc9481e45565aba1;hpb=555663eec40d16fbc622bb5de1de37f2253c359b;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 5c94a8e361..43a3f6dc49 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -7,14 +7,16 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; @@ -33,6 +35,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.OptionalLong; import java.util.Queue; import java.util.SortedSet; @@ -99,7 +102,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { long lastAccess; CommitEntry(final SimpleShardDataTreeCohort cohort, final long now) { - this.cohort = Preconditions.checkNotNull(cohort); + this.cohort = requireNonNull(cohort); lastAccess = now; } @@ -155,12 +158,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { - this.dataTree = Preconditions.checkNotNull(dataTree); + this.dataTree = requireNonNull(dataTree); updateSchemaContext(schemaContext); - this.shard = Preconditions.checkNotNull(shard); - this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher); - this.logContext = Preconditions.checkNotNull(logContext); + this.shard = requireNonNull(shard); + this.treeChangeListenerPublisher = requireNonNull(treeChangeListenerPublisher); + this.logContext = requireNonNull(logContext); this.metadata = ImmutableList.copyOf(metadata); tip = dataTree; } @@ -184,7 +187,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { - this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, + this(shard, schemaContext, treeType, YangInstanceIdentifier.empty(), new DefaultShardDataTreeChangeListenerPublisher(""), ""); } @@ -206,7 +209,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { void updateSchemaContext(final SchemaContext newSchemaContext) { dataTree.setSchemaContext(newSchemaContext); - this.schemaContext = Preconditions.checkNotNull(newSchemaContext); + this.schemaContext = requireNonNull(newSchemaContext); this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); } @@ -220,7 +223,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @return A state snapshot */ @NonNull ShardDataTreeSnapshot takeStateSnapshot() { - final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get(); + final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get(); final Builder>, ShardDataTreeSnapshotMetadata> metaBuilder = ImmutableMap.builder(); @@ -264,12 +267,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification()); // delete everything first - mod.delete(YangInstanceIdentifier.EMPTY); + mod.delete(YangInstanceIdentifier.empty()); - final java.util.Optional> maybeNode = snapshot.getRootNode(); + final Optional> maybeNode = snapshot.getRootNode(); if (maybeNode.isPresent()) { // Add everything from the remote node back - mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get()); + mod.write(YangInstanceIdentifier.empty(), maybeNode.get()); } mod.ready(); @@ -316,9 +319,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void applyRecoveryCandidate(final DataTreeCandidate candidate) { + private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException { + final Entry entry = payload.getCandidate(); + final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification()); - DataTreeCandidates.applyToModification(mod, candidate); + DataTreeCandidates.applyToModification(mod, entry.getValue()); mod.ready(); final DataTreeModification unwrapped = mod.delegate(); @@ -335,6 +340,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { "%s: Failed to apply recovery payload. Modification data was written to file %s", logContext, file), e); } + + allMetadataCommittedTransaction(entry.getKey()); } /** @@ -347,10 +354,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { */ void applyRecoveryPayload(final @NonNull Payload payload) throws IOException { if (payload instanceof CommitTransactionPayload) { - final Entry e = - ((CommitTransactionPayload) payload).getCandidate(); - applyRecoveryCandidate(e.getValue()); - allMetadataCommittedTransaction(e.getKey()); + applyRecoveryCandidate((CommitTransactionPayload) payload); } else if (payload instanceof AbortTransactionPayload) { allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } else if (payload instanceof PurgeTransactionPayload) { @@ -366,12 +370,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign) - throws DataValidationFailedException { + private void applyReplicatedCandidate(final CommitTransactionPayload payload) + throws DataValidationFailedException, IOException { + final Entry entry = payload.getCandidate(); + final TransactionIdentifier identifier = entry.getKey(); LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); final DataTreeModification mod = dataTree.takeSnapshot().newModification(); - DataTreeCandidates.applyToModification(mod, foreign); + DataTreeCandidates.applyToModification(mod, entry.getValue()); mod.ready(); LOG.trace("{}: Applying foreign modification {}", logContext, mod); @@ -407,11 +413,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { */ if (payload instanceof CommitTransactionPayload) { if (identifier == null) { - final Entry e = - ((CommitTransactionPayload) payload).getCandidate(); - applyReplicatedCandidate(e.getKey(), e.getValue()); + applyReplicatedCandidate((CommitTransactionPayload) payload); } else { - Verify.verify(identifier instanceof TransactionIdentifier); + verify(identifier instanceof TransactionIdentifier); payloadReplicationComplete((TransactionIdentifier) identifier); } } else if (payload instanceof AbortTransactionPayload) { @@ -527,8 +531,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final boolean closed) { final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this); final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret); - Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, - existing); + checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, existing); return ret; } @@ -643,10 +646,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } Optional readCurrentData() { - final java.util.Optional> currentState = - dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); - return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( - YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); + return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()) + .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state)); } public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, @@ -675,7 +676,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction, - final java.util.Optional> participatingShardNames) { + final Optional> participatingShardNames) { final DataTreeModification snapshot = transaction.getSnapshot(); final TransactionIdentifier id = transaction.getIdentifier(); LOG.debug("{}: readying transaction {}", logContext, id); @@ -692,7 +693,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } public Optional> readNode(final YangInstanceIdentifier path) { - return Optional.fromJavaUtil(dataTree.takeSnapshot().readNode(path)); + return dataTree.takeSnapshot().readNode(path); } DataTreeSnapshot takeSnapshot() { @@ -765,8 +766,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // For debugging purposes, allow dumping of the modification. Coupled with the above // precondition log, it should allow us to understand what went on. - LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", logContext, cohort.getIdentifier(), - modification, dataTree); + LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.getIdentifier(), modification); + LOG.trace("{}: Current tree: {}", logContext, dataTree); cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e); } catch (Exception e) { LOG.warn("{}: Unexpected failure in validation phase", logContext, e); @@ -918,8 +919,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { tempStack.forEach(queue::addFirst); } - private Collection extractPrecedingShardNames( - final java.util.Optional> participatingShardNames) { + private Collection extractPrecedingShardNames(final Optional> participatingShardNames) { return participatingShardNames.map((Function, Collection>) set -> set.headSet(shard.getShardName())).orElse(Collections.emptyList()); } @@ -933,10 +933,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @SuppressWarnings("checkstyle:IllegalCatch") void startPreCommit(final SimpleShardDataTreeCohort cohort) { final CommitEntry entry = pendingTransactions.peek(); - Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort); + checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort); final SimpleShardDataTreeCohort current = entry.cohort; - Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); + verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); final TransactionIdentifier currentId = current.getIdentifier(); LOG.debug("{}: Preparing transaction {}", logContext, currentId); @@ -945,7 +945,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { try { candidate = tip.prepare(cohort.getDataTreeModification()); LOG.debug("{}: Transaction {} candidate ready", logContext, currentId); - } catch (RuntimeException e) { + } catch (DataValidationFailedException | RuntimeException e) { failPreCommit(e); return; } @@ -954,7 +954,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override public void onSuccess(final Void noop) { // Set the tip of the data tree. - tip = Verify.verifyNotNull(candidate); + tip = verifyNotNull(candidate); entry.lastAccess = readTime(); @@ -1016,7 +1016,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { final CommitEntry entry = pendingCommits.peek(); - Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort); + checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort); final SimpleShardDataTreeCohort current = entry.cohort; if (!cohort.equals(current)) { @@ -1086,7 +1086,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, - final java.util.Optional> participatingShardNames) { + final Optional> participatingShardNames) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf, COMMIT_STEP_TIMEOUT), participatingShardNames); @@ -1097,7 +1097,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics // the newReadWriteTransaction() ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, - final java.util.Optional> participatingShardNames) { + final Optional> participatingShardNames) { if (txId.getHistoryId().getHistoryId() == 0) { return createReadyCohort(txId, mod, participatingShardNames); } @@ -1256,7 +1256,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @SuppressWarnings("checkstyle:IllegalCatch") private void rebaseTransactions(final Iterator iter, final @NonNull DataTreeTip newTip) { - tip = Preconditions.checkNotNull(newTip); + tip = requireNonNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort; if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {