X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=5b015ead9acee3034600a86c6bad4a93ea4c62b5;hb=refs%2Fchanges%2F86%2F48686%2F16;hp=f1d37872fd20262ae4e3c114f8ad96c6cf7e7e75;hpb=6276a65120a674b545ea787a5e1d9311bcdbf2af;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
index f1d37872fd..5b015ead9a 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
@@ -13,11 +13,14 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.primitives.UnsignedLong;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayDeque;
@@ -42,6 +45,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.CommitTransaction
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
@@ -75,6 +79,7 @@ import scala.concurrent.duration.Duration;
* e.g. it does not expose public interfaces and assumes it is only ever called from a
* single thread.
*
+ *
* This class is not part of the API contract and is subject to change at any time.
*/
@NotThreadSafe
@@ -132,10 +137,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
new DefaultShardDataChangeListenerPublisher(), "");
}
- String logContext() {
+ final String logContext() {
return logContext;
}
+ final Ticker ticker() {
+ return shard.ticker();
+ }
+
public TipProducingDataTree getDataTree() {
return dataTree;
}
@@ -144,9 +153,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return schemaContext;
}
- void updateSchemaContext(final SchemaContext schemaContext) {
- dataTree.setSchemaContext(schemaContext);
- this.schemaContext = Preconditions.checkNotNull(schemaContext);
+ void updateSchemaContext(final SchemaContext newSchemaContext) {
+ dataTree.setSchemaContext(newSchemaContext);
+ this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
}
/**
@@ -160,7 +169,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
ImmutableMap.builder();
for (ShardDataTreeMetadata> m : metadata) {
- final ShardDataTreeSnapshotMetadata> meta = m.toStapshot();
+ final ShardDataTreeSnapshotMetadata> meta = m.toSnapshot();
if (meta != null) {
metaBuilder.put(meta.getType(), meta);
}
@@ -169,7 +178,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
}
- private void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot,
+ private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot,
final UnaryOperator wrapper) throws DataValidationFailedException {
final Stopwatch elapsed = Stopwatch.createStarted();
@@ -206,10 +215,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final DataTreeModification unwrapped = unwrap(mod);
dataTree.validate(unwrapped);
- dataTree.commit(dataTree.prepare(unwrapped));
+ DataTreeCandidateTip candidate = dataTree.prepare(unwrapped);
+ dataTree.commit(candidate);
+ notifyListeners(candidate);
+
LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
}
+ /**
+ * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+ * does not perform any pruning.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ applySnapshot(snapshot, UnaryOperator.identity());
+ }
+
private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
return new PruningDataTreeModification(delegate, dataTree, schemaContext);
}
@@ -232,18 +255,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
applySnapshot(snapshot, this::wrapWithPruning);
}
-
- /**
- * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
- * does not perform any pruning.
- *
- * @param snapshot Snapshot that needs to be applied
- * @throws DataValidationFailedException when the snapshot fails to apply
- */
- void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
- applySnapshot(snapshot, UnaryOperator.identity());
- }
-
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
DataTreeCandidates.applyToModification(mod, candidate);
@@ -252,8 +264,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final DataTreeModification unwrapped = mod.delegate();
LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
- dataTree.validate(unwrapped);
- dataTree.commit(dataTree.prepare(unwrapped));
+ try {
+ dataTree.validate(unwrapped);
+ dataTree.commit(dataTree.prepare(unwrapped));
+ } catch (Exception e) {
+ File file = new File(System.getProperty("karaf.data", "."),
+ "failed-recovery-payload-" + logContext + ".out");
+ DataTreeModificationOutput.toFile(file, unwrapped);
+ throw new IllegalStateException(String.format(
+ "%s: Failed to apply recovery payload. Modification data was written to file %s",
+ logContext, file), e);
+ }
}
/**
@@ -266,13 +287,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
*/
void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
if (payload instanceof CommitTransactionPayload) {
- final Entry e = ((CommitTransactionPayload) payload).getCandidate();
+ final Entry e =
+ ((CommitTransactionPayload) payload).getCandidate();
applyRecoveryCandidate(e.getValue());
allMetadataCommittedTransaction(e.getKey());
} else if (payload instanceof DataTreeCandidatePayload) {
applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
} else {
- LOG.warn("{}: ignoring unhandled payload {}", logContext, payload);
+ LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
}
}
@@ -316,7 +338,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
*/
if (payload instanceof CommitTransactionPayload) {
if (identifier == null) {
- final Entry e = ((CommitTransactionPayload) payload).getCandidate();
+ final Entry e =
+ ((CommitTransactionPayload) payload).getCandidate();
applyReplicatedCandidate(e.getKey(), e.getValue());
allMetadataCommittedTransaction(e.getKey());
} else {
@@ -346,11 +369,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
for (ShardDataTreeMetadata> m : metadata) {
- m.transactionCommitted(txId);
+ m.onTransactionCommitted(txId);
}
}
- private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
+ ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
if (chain == null) {
chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
@@ -377,6 +400,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
}
+ @VisibleForTesting
public void notifyListeners(final DataTreeCandidate candidate) {
treeChangeListenerPublisher.publishChanges(candidate, logContext);
dataChangeListenerPublisher.publishChanges(candidate, logContext);
@@ -422,22 +446,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
Optional> registerChangeListener(final YangInstanceIdentifier path,
final AsyncDataChangeListener> listener,
final DataChangeScope scope) {
- final DataChangeListenerRegistration>> reg =
+ DataChangeListenerRegistration>> reg =
dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
return new SimpleEntry<>(reg, readCurrentData());
}
private Optional readCurrentData() {
- final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
+ final Optional> currentState =
+ dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent();
}
- public Entry, Optional> registerTreeChangeListener(
- final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
- final ListenerRegistration reg = treeChangeListenerPublisher.registerTreeChangeListener(
- path, listener);
+ public Entry, Optional>
+ registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
+ final ListenerRegistration reg =
+ treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
return new SimpleEntry<>(reg, readCurrentData());
}
@@ -456,22 +481,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final DataTreeModification snapshot = transaction.getSnapshot();
snapshot.ready();
- return createReadyCohort(transaction.getId(), snapshot);
+ return createReadyCohort(transaction.getIdentifier(), snapshot);
}
public Optional> readNode(final YangInstanceIdentifier path) {
return dataTree.takeSnapshot().readNode(path);
}
- public DataTreeSnapshot takeSnapshot() {
+ DataTreeSnapshot takeSnapshot() {
return dataTree.takeSnapshot();
}
+ @VisibleForTesting
public DataTreeModification newModification() {
return dataTree.takeSnapshot().newModification();
}
/**
+ * Commits a modification.
+ *
* @deprecated This method violates DataTree containment and will be removed.
*/
@VisibleForTesting
@@ -486,7 +514,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
public Collection getAndClearPendingTransactions() {
Collection ret = new ArrayList<>(pendingTransactions.size());
- for(CommitEntry entry: pendingTransactions) {
+ for (CommitEntry entry: pendingTransactions) {
ret.add(entry.cohort);
}
@@ -494,13 +522,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return ret;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void processNextTransaction() {
while (!pendingTransactions.isEmpty()) {
final CommitEntry entry = pendingTransactions.peek();
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();
- if(cohort.getState() != State.CAN_COMMIT_PENDING) {
+ if (cohort.getState() != State.CAN_COMMIT_PENDING) {
break;
}
@@ -522,7 +551,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
+ LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
+ dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation.", e);
} catch (Exception e) {
LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
@@ -552,6 +582,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
processNextTransaction();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
@@ -583,6 +614,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
processNextTransaction();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void finishCommit(final SimpleShardDataTreeCohort cohort) {
final TransactionIdentifier txId = cohort.getIdentifier();
final DataTreeCandidate candidate = cohort.getCandidate();
@@ -642,6 +674,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
cohortRegistry.process(sender, message);
}
+ @Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
final DataTreeModification modification) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
@@ -650,6 +683,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return cohort;
}
+ @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED", "DB_DUPLICATE_SWITCH_CLAUSES"},
+ justification = "See inline comments below.")
void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = shard.ticker().read();
@@ -663,6 +698,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
break;
case CAN_COMMIT_COMPLETE:
+ // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+ // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+ // in PRE_COMMIT_COMPLETE is changed.
pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
break;
case PRE_COMMIT_PENDING:
@@ -699,6 +737,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
case FAILED:
case READY:
default:
+ // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In
+ // this case, we just want to drop the current entry that expired and thus ignore the return value.
+ // In fact we really shouldn't hit this case but we handle all enums for completeness.
pendingTransactions.poll();
}
@@ -708,6 +749,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "See inline comment below.")
void startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator it = pendingTransactions.iterator();
if (!it.hasNext()) {
@@ -721,6 +763,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
if (cohort.getState() != State.COMMIT_PENDING) {
LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
cohort.getIdentifier());
+
+ // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In
+ // this case, we've already obtained the head of the queue above via the Iterator and we just want to
+ // remove it here.
pendingTransactions.poll();
processNextTransaction();
} else {
@@ -748,12 +794,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
private void maybeRunOperationOnPendingTransactionsComplete() {
- if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
- LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
- runOnPendingTransactionsComplete);
-
- runOnPendingTransactionsComplete.run();
- runOnPendingTransactionsComplete = null;
- }
- }
+ if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+ LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+ runOnPendingTransactionsComplete);
+
+ runOnPendingTransactionsComplete.run();
+ runOnPendingTransactionsComplete = null;
+ }
+ }
}