X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=5b015ead9acee3034600a86c6bad4a93ea4c62b5;hb=refs%2Fchanges%2F86%2F48686%2F16;hp=f1d37872fd20262ae4e3c114f8ad96c6cf7e7e75;hpb=6276a65120a674b545ea787a5e1d9311bcdbf2af;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index f1d37872fd..5b015ead9a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -13,11 +13,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.primitives.UnsignedLong; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.File; import java.io.IOException; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayDeque; @@ -42,6 +45,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.CommitTransaction import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; +import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; @@ -75,6 +79,7 @@ import scala.concurrent.duration.Duration; * e.g. it does not expose public interfaces and assumes it is only ever called from a * single thread. * + *

* This class is not part of the API contract and is subject to change at any time. */ @NotThreadSafe @@ -132,10 +137,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { new DefaultShardDataChangeListenerPublisher(), ""); } - String logContext() { + final String logContext() { return logContext; } + final Ticker ticker() { + return shard.ticker(); + } + public TipProducingDataTree getDataTree() { return dataTree; } @@ -144,9 +153,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return schemaContext; } - void updateSchemaContext(final SchemaContext schemaContext) { - dataTree.setSchemaContext(schemaContext); - this.schemaContext = Preconditions.checkNotNull(schemaContext); + void updateSchemaContext(final SchemaContext newSchemaContext) { + dataTree.setSchemaContext(newSchemaContext); + this.schemaContext = Preconditions.checkNotNull(newSchemaContext); } /** @@ -160,7 +169,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ImmutableMap.builder(); for (ShardDataTreeMetadata m : metadata) { - final ShardDataTreeSnapshotMetadata meta = m.toStapshot(); + final ShardDataTreeSnapshotMetadata meta = m.toSnapshot(); if (meta != null) { metaBuilder.put(meta.getType(), meta); } @@ -169,7 +178,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build()); } - private void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot, + private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot, final UnaryOperator wrapper) throws DataValidationFailedException { final Stopwatch elapsed = Stopwatch.createStarted(); @@ -206,10 +215,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeModification unwrapped = unwrap(mod); dataTree.validate(unwrapped); - dataTree.commit(dataTree.prepare(unwrapped)); + DataTreeCandidateTip candidate = dataTree.prepare(unwrapped); + dataTree.commit(candidate); + notifyListeners(candidate); + LOG.debug("{}: state snapshot applied in %s", logContext, elapsed); } + /** + * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and + * does not perform any pruning. + * + * @param snapshot Snapshot that needs to be applied + * @throws DataValidationFailedException when the snapshot fails to apply + */ + void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + applySnapshot(snapshot, UnaryOperator.identity()); + } + private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) { return new PruningDataTreeModification(delegate, dataTree, schemaContext); } @@ -232,18 +255,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { applySnapshot(snapshot, this::wrapWithPruning); } - - /** - * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and - * does not perform any pruning. - * - * @param snapshot Snapshot that needs to be applied - * @throws DataValidationFailedException when the snapshot fails to apply - */ - void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { - applySnapshot(snapshot, UnaryOperator.identity()); - } - + @SuppressWarnings("checkstyle:IllegalCatch") private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException { final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification()); DataTreeCandidates.applyToModification(mod, candidate); @@ -252,8 +264,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeModification unwrapped = mod.delegate(); LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped); - dataTree.validate(unwrapped); - dataTree.commit(dataTree.prepare(unwrapped)); + try { + dataTree.validate(unwrapped); + dataTree.commit(dataTree.prepare(unwrapped)); + } catch (Exception e) { + File file = new File(System.getProperty("karaf.data", "."), + "failed-recovery-payload-" + logContext + ".out"); + DataTreeModificationOutput.toFile(file, unwrapped); + throw new IllegalStateException(String.format( + "%s: Failed to apply recovery payload. Modification data was written to file %s", + logContext, file), e); + } } /** @@ -266,13 +287,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { */ void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException { if (payload instanceof CommitTransactionPayload) { - final Entry e = ((CommitTransactionPayload) payload).getCandidate(); + final Entry e = + ((CommitTransactionPayload) payload).getCandidate(); applyRecoveryCandidate(e.getValue()); allMetadataCommittedTransaction(e.getKey()); } else if (payload instanceof DataTreeCandidatePayload) { applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate()); } else { - LOG.warn("{}: ignoring unhandled payload {}", logContext, payload); + LOG.debug("{}: ignoring unhandled payload {}", logContext, payload); } } @@ -316,7 +338,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { */ if (payload instanceof CommitTransactionPayload) { if (identifier == null) { - final Entry e = ((CommitTransactionPayload) payload).getCandidate(); + final Entry e = + ((CommitTransactionPayload) payload).getCandidate(); applyReplicatedCandidate(e.getKey(), e.getValue()); allMetadataCommittedTransaction(e.getKey()); } else { @@ -346,11 +369,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private void allMetadataCommittedTransaction(final TransactionIdentifier txId) { for (ShardDataTreeMetadata m : metadata) { - m.transactionCommitted(txId); + m.onTransactionCommitted(txId); } } - private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier); if (chain == null) { chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this); @@ -377,6 +400,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId); } + @VisibleForTesting public void notifyListeners(final DataTreeCandidate candidate) { treeChangeListenerPublisher.publishChanges(candidate, logContext); dataChangeListenerPublisher.publishChanges(candidate, logContext); @@ -422,22 +446,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Optional> registerChangeListener(final YangInstanceIdentifier path, final AsyncDataChangeListener> listener, final DataChangeScope scope) { - final DataChangeListenerRegistration>> reg = + DataChangeListenerRegistration>> reg = dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope); return new SimpleEntry<>(reg, readCurrentData()); } private Optional readCurrentData() { - final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); + final Optional> currentState = + dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); } - public Entry, Optional> registerTreeChangeListener( - final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { - final ListenerRegistration reg = treeChangeListenerPublisher.registerTreeChangeListener( - path, listener); + public Entry, Optional> + registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { + final ListenerRegistration reg = + treeChangeListenerPublisher.registerTreeChangeListener(path, listener); return new SimpleEntry<>(reg, readCurrentData()); } @@ -456,22 +481,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); - return createReadyCohort(transaction.getId(), snapshot); + return createReadyCohort(transaction.getIdentifier(), snapshot); } public Optional> readNode(final YangInstanceIdentifier path) { return dataTree.takeSnapshot().readNode(path); } - public DataTreeSnapshot takeSnapshot() { + DataTreeSnapshot takeSnapshot() { return dataTree.takeSnapshot(); } + @VisibleForTesting public DataTreeModification newModification() { return dataTree.takeSnapshot().newModification(); } /** + * Commits a modification. + * * @deprecated This method violates DataTree containment and will be removed. */ @VisibleForTesting @@ -486,7 +514,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { public Collection getAndClearPendingTransactions() { Collection ret = new ArrayList<>(pendingTransactions.size()); - for(CommitEntry entry: pendingTransactions) { + for (CommitEntry entry: pendingTransactions) { ret.add(entry.cohort); } @@ -494,13 +522,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return ret; } + @SuppressWarnings("checkstyle:IllegalCatch") private void processNextTransaction() { while (!pendingTransactions.isEmpty()) { final CommitEntry entry = pendingTransactions.peek(); final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); - if(cohort.getState() != State.CAN_COMMIT_PENDING) { + if (cohort.getState() != State.CAN_COMMIT_PENDING) { break; } @@ -522,7 +551,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // For debugging purposes, allow dumping of the modification. Coupled with the above // precondition log, it should allow us to understand what went on. - LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree); + LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, + dataTree); cause = new TransactionCommitFailedException("Data did not pass validation.", e); } catch (Exception e) { LOG.warn("{}: Unexpected failure in validation phase", logContext, e); @@ -552,6 +582,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextTransaction(); } + @SuppressWarnings("checkstyle:IllegalCatch") void startPreCommit(final SimpleShardDataTreeCohort cohort) { final CommitEntry entry = pendingTransactions.peek(); Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort); @@ -583,6 +614,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextTransaction(); } + @SuppressWarnings("checkstyle:IllegalCatch") private void finishCommit(final SimpleShardDataTreeCohort cohort) { final TransactionIdentifier txId = cohort.getIdentifier(); final DataTreeCandidate candidate = cohort.getCandidate(); @@ -642,6 +674,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { cohortRegistry.process(sender, message); } + @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification modification) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, @@ -650,6 +683,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return cohort; } + @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED", "DB_DUPLICATE_SWITCH_CLAUSES"}, + justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = shard.ticker().read(); @@ -663,6 +698,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException()); break; case CAN_COMMIT_COMPLETE: + // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause + // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code + // in PRE_COMMIT_COMPLETE is changed. pendingTransactions.poll().cohort.reportFailure(new TimeoutException()); break; case PRE_COMMIT_PENDING: @@ -699,6 +737,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { case FAILED: case READY: default: + // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In + // this case, we just want to drop the current entry that expired and thus ignore the return value. + // In fact we really shouldn't hit this case but we handle all enums for completeness. pendingTransactions.poll(); } @@ -708,6 +749,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "See inline comment below.") void startAbort(final SimpleShardDataTreeCohort cohort) { final Iterator it = pendingTransactions.iterator(); if (!it.hasNext()) { @@ -721,6 +763,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (cohort.getState() != State.COMMIT_PENDING) { LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(), cohort.getIdentifier()); + + // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In + // this case, we've already obtained the head of the queue above via the Iterator and we just want to + // remove it here. pendingTransactions.poll(); processNextTransaction(); } else { @@ -748,12 +794,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } private void maybeRunOperationOnPendingTransactionsComplete() { - if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) { - LOG.debug("{}: Pending transactions complete - running operation {}", logContext, - runOnPendingTransactionsComplete); - - runOnPendingTransactionsComplete.run(); - runOnPendingTransactionsComplete = null; - } - } + if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) { + LOG.debug("{}: Pending transactions complete - running operation {}", logContext, + runOnPendingTransactionsComplete); + + runOnPendingTransactionsComplete.run(); + runOnPendingTransactionsComplete = null; + } + } }