X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=43a3f6dc49cb201ea34c167a6d57c5794740b6f8;hb=55a9b9f42a14c56060f74b38f84d444c0fbfecc4;hp=5ca0acba4d26e6df0ec2c99699a2ea244d878cfa;hpb=127042ea7e148d9dc0282acc3780b4754ca69e12;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
index 5ca0acba4d..43a3f6dc49 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
@@ -7,14 +7,16 @@
*/
package org.opendaylight.controller.cluster.datastore;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
@@ -33,6 +35,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Queue;
import java.util.SortedSet;
import java.util.concurrent.TimeUnit;
@@ -40,7 +44,6 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
-import javax.annotation.concurrent.NotThreadSafe;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
@@ -87,21 +90,19 @@ import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
/**
- * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
- * e.g. it does not expose public interfaces and assumes it is only ever called from a
- * single thread.
+ * Internal shard state, similar to a DOMStore, but optimized for use in the actor system, e.g. it does not expose
+ * public interfaces and assumes it is only ever called from a single thread.
*
*
- * This class is not part of the API contract and is subject to change at any time.
+ * This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe.
*/
-@NotThreadSafe
public class ShardDataTree extends ShardDataTreeTransactionParent {
private static final class CommitEntry {
final SimpleShardDataTreeCohort cohort;
long lastAccess;
CommitEntry(final SimpleShardDataTreeCohort cohort, final long now) {
- this.cohort = Preconditions.checkNotNull(cohort);
+ this.cohort = requireNonNull(cohort);
lastAccess = now;
}
@@ -157,12 +158,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final String logContext,
final ShardDataTreeMetadata>... metadata) {
- this.dataTree = Preconditions.checkNotNull(dataTree);
+ this.dataTree = requireNonNull(dataTree);
updateSchemaContext(schemaContext);
- this.shard = Preconditions.checkNotNull(shard);
- this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
- this.logContext = Preconditions.checkNotNull(logContext);
+ this.shard = requireNonNull(shard);
+ this.treeChangeListenerPublisher = requireNonNull(treeChangeListenerPublisher);
+ this.logContext = requireNonNull(logContext);
this.metadata = ImmutableList.copyOf(metadata);
tip = dataTree;
}
@@ -186,7 +187,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@VisibleForTesting
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
- this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
+ this(shard, schemaContext, treeType, YangInstanceIdentifier.empty(),
new DefaultShardDataTreeChangeListenerPublisher(""), "");
}
@@ -208,7 +209,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
void updateSchemaContext(final SchemaContext newSchemaContext) {
dataTree.setSchemaContext(newSchemaContext);
- this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
+ this.schemaContext = requireNonNull(newSchemaContext);
this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
@@ -222,7 +223,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @return A state snapshot
*/
@NonNull ShardDataTreeSnapshot takeStateSnapshot() {
- final NormalizedNode, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get();
+ final NormalizedNode, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
final Builder>, ShardDataTreeSnapshotMetadata>> metaBuilder =
ImmutableMap.builder();
@@ -266,12 +267,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification());
// delete everything first
- mod.delete(YangInstanceIdentifier.EMPTY);
+ mod.delete(YangInstanceIdentifier.empty());
- final java.util.Optional> maybeNode = snapshot.getRootNode();
+ final Optional> maybeNode = snapshot.getRootNode();
if (maybeNode.isPresent()) {
// Add everything from the remote node back
- mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get());
+ mod.write(YangInstanceIdentifier.empty(), maybeNode.get());
}
mod.ready();
@@ -318,9 +319,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void applyRecoveryCandidate(final DataTreeCandidate candidate) {
+ private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
+ final Entry entry = payload.getCandidate();
+
final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
- DataTreeCandidates.applyToModification(mod, candidate);
+ DataTreeCandidates.applyToModification(mod, entry.getValue());
mod.ready();
final DataTreeModification unwrapped = mod.delegate();
@@ -337,6 +340,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
"%s: Failed to apply recovery payload. Modification data was written to file %s",
logContext, file), e);
}
+
+ allMetadataCommittedTransaction(entry.getKey());
}
/**
@@ -349,10 +354,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
*/
void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
if (payload instanceof CommitTransactionPayload) {
- final Entry e =
- ((CommitTransactionPayload) payload).getCandidate();
- applyRecoveryCandidate(e.getValue());
- allMetadataCommittedTransaction(e.getKey());
+ applyRecoveryCandidate((CommitTransactionPayload) payload);
} else if (payload instanceof AbortTransactionPayload) {
allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
} else if (payload instanceof PurgeTransactionPayload) {
@@ -368,12 +370,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
}
- private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign)
- throws DataValidationFailedException {
+ private void applyReplicatedCandidate(final CommitTransactionPayload payload)
+ throws DataValidationFailedException, IOException {
+ final Entry entry = payload.getCandidate();
+ final TransactionIdentifier identifier = entry.getKey();
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
final DataTreeModification mod = dataTree.takeSnapshot().newModification();
- DataTreeCandidates.applyToModification(mod, foreign);
+ DataTreeCandidates.applyToModification(mod, entry.getValue());
mod.ready();
LOG.trace("{}: Applying foreign modification {}", logContext, mod);
@@ -409,11 +413,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
*/
if (payload instanceof CommitTransactionPayload) {
if (identifier == null) {
- final Entry e =
- ((CommitTransactionPayload) payload).getCandidate();
- applyReplicatedCandidate(e.getKey(), e.getValue());
+ applyReplicatedCandidate((CommitTransactionPayload) payload);
} else {
- Verify.verify(identifier instanceof TransactionIdentifier);
+ verify(identifier instanceof TransactionIdentifier);
payloadReplicationComplete((TransactionIdentifier) identifier);
}
} else if (payload instanceof AbortTransactionPayload) {
@@ -529,8 +531,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
final boolean closed) {
final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
- Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId,
- existing);
+ checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, existing);
return ret;
}
@@ -550,6 +551,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+ shard.getShardMBean().incrementReadOnlyTransactionCount();
+
if (txId.getHistoryId().getHistoryId() == 0) {
return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
}
@@ -558,6 +561,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+ shard.getShardMBean().incrementReadWriteTransactionCount();
+
if (txId.getHistoryId().getHistoryId() == 0) {
return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot()
.newModification());
@@ -591,18 +596,33 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
* @param callback Callback to invoke upon completion, may be null
*/
void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
+ if (commonCloseTransactionChain(id, callback)) {
+ replicatePayload(id, CloseLocalHistoryPayload.create(id,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+ }
+ }
+
+ /**
+ * Close a single transaction chain which is received through ask-based protocol. It does not keep a commit record.
+ *
+ * @param id History identifier
+ */
+ void closeTransactionChain(final LocalHistoryIdentifier id) {
+ commonCloseTransactionChain(id, null);
+ }
+
+ private boolean commonCloseTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
final ShardDataTreeTransactionChain chain = transactionChains.get(id);
if (chain == null) {
LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id);
if (callback != null) {
callback.run();
}
- return;
+ return false;
}
chain.close();
- replicatePayload(id, CloseLocalHistoryPayload.create(
- id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+ return true;
}
/**
@@ -626,10 +646,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
Optional readCurrentData() {
- final java.util.Optional> currentState =
- dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
- return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
- YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent();
+ return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
+ .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
}
public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
@@ -658,9 +676,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@Override
ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
- final java.util.Optional> participatingShardNames) {
+ final Optional> participatingShardNames) {
final DataTreeModification snapshot = transaction.getSnapshot();
+ final TransactionIdentifier id = transaction.getIdentifier();
+ LOG.debug("{}: readying transaction {}", logContext, id);
snapshot.ready();
+ LOG.debug("{}: transaction {} ready", logContext, id);
return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
}
@@ -672,7 +693,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
}
public Optional> readNode(final YangInstanceIdentifier path) {
- return Optional.fromJavaUtil(dataTree.takeSnapshot().readNode(path));
+ return dataTree.takeSnapshot().readNode(path);
}
DataTreeSnapshot takeSnapshot() {
@@ -745,8 +766,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", logContext, cohort.getIdentifier(),
- modification, dataTree);
+ LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.getIdentifier(), modification);
+ LOG.trace("{}: Current tree: {}", logContext, dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
} catch (Exception e) {
LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
@@ -898,8 +919,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
tempStack.forEach(queue::addFirst);
}
- private Collection extractPrecedingShardNames(
- final java.util.Optional> participatingShardNames) {
+ private Collection extractPrecedingShardNames(final Optional> participatingShardNames) {
return participatingShardNames.map((Function, Collection>)
set -> set.headSet(shard.getShardName())).orElse(Collections.emptyList());
}
@@ -913,17 +933,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
- Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
+ checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
final SimpleShardDataTreeCohort current = entry.cohort;
- Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+ verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
- LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier());
+ final TransactionIdentifier currentId = current.getIdentifier();
+ LOG.debug("{}: Preparing transaction {}", logContext, currentId);
final DataTreeCandidateTip candidate;
try {
candidate = tip.prepare(cohort.getDataTreeModification());
- } catch (RuntimeException e) {
+ LOG.debug("{}: Transaction {} candidate ready", logContext, currentId);
+ } catch (DataValidationFailedException | RuntimeException e) {
failPreCommit(e);
return;
}
@@ -932,14 +954,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@Override
public void onSuccess(final Void noop) {
// Set the tip of the data tree.
- tip = Verify.verifyNotNull(candidate);
+ tip = verifyNotNull(candidate);
entry.lastAccess = readTime();
pendingTransactions.remove();
pendingCommits.add(entry);
- LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+ LOG.debug("{}: Transaction {} prepared", logContext, currentId);
cohort.successfulPreCommit(candidate);
@@ -994,7 +1016,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
final CommitEntry entry = pendingCommits.peek();
- Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
+ checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
final SimpleShardDataTreeCohort current = entry.cohort;
if (!cohort.equals(current)) {
@@ -1064,7 +1086,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
- final java.util.Optional> participatingShardNames) {
+ final Optional> participatingShardNames) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
COMMIT_STEP_TIMEOUT), participatingShardNames);
@@ -1075,7 +1097,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
// Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
// the newReadWriteTransaction()
ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
- final java.util.Optional> participatingShardNames) {
+ final Optional> participatingShardNames) {
if (txId.getHistoryId().getHistoryId() == 0) {
return createReadyCohort(txId, mod, participatingShardNames);
}
@@ -1085,7 +1107,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
- final Function> accessTimeUpdater) {
+ final Function accessTimeUpdater) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = readTime();
@@ -1103,9 +1125,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
return;
}
- final Optional updateOpt = accessTimeUpdater.apply(currentTx.cohort);
+ final OptionalLong updateOpt = accessTimeUpdater.apply(currentTx.cohort);
if (updateOpt.isPresent()) {
- final long newAccess = updateOpt.get().longValue();
+ final long newAccess = updateOpt.getAsLong();
final long newDelta = now - newAccess;
if (newDelta < delta) {
LOG.debug("{}: Updated current transaction {} access time", logContext,
@@ -1234,7 +1256,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
@SuppressWarnings("checkstyle:IllegalCatch")
private void rebaseTransactions(final Iterator iter, final @NonNull DataTreeTip newTip) {
- tip = Preconditions.checkNotNull(newTip);
+ tip = requireNonNull(newTip);
while (iter.hasNext()) {
final SimpleShardDataTreeCohort cohort = iter.next().cohort;
if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {