X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=c316fdfb9e6b768b2ab5850c1068957527b36da5;hp=e8b41d97474f49951b8b67846d22c7a06b73d185;hb=5a824a836a00ff21a855d7339a2fa9b64678db52;hpb=52725324973f22ac0c85ed4fd8459cf0ef504407 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index e8b41d9747..c316fdfb9e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -7,14 +7,17 @@ */ package org.opendaylight.controller.cluster.datastore; +import static akka.actor.ActorRef.noSender; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; @@ -27,39 +30,45 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalLong; import java.util.Queue; +import java.util.SortedSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.node.utils.transformer.ReusableNormalizedNodePruner; import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload; +import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion; import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; @@ -77,34 +86,41 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; /** - * Internal shard state, similar to a DOMStore, but optimized for use in the actor system, - * e.g. it does not expose public interfaces and assumes it is only ever called from a - * single thread. + * Internal shard state, similar to a DOMStore, but optimized for use in the actor system, e.g. it does not expose + * public interfaces and assumes it is only ever called from a single thread. * *

- * This class is not part of the API contract and is subject to change at any time. + * This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe. */ -@NotThreadSafe public class ShardDataTree extends ShardDataTreeTransactionParent { private static final class CommitEntry { final SimpleShardDataTreeCohort cohort; long lastAccess; CommitEntry(final SimpleShardDataTreeCohort cohort, final long now) { - this.cohort = Preconditions.checkNotNull(cohort); + this.cohort = requireNonNull(cohort); lastAccess = now; } + + @Override + public String toString() { + return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]"; + } } - private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(FiniteDuration.create(5, TimeUnit.SECONDS)); private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); /** @@ -117,7 +133,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Map transactionChains = new HashMap<>(); private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); - private final Queue pendingTransactions = new ArrayDeque<>(); + private final Deque pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); private final Queue pendingFinishCommits = new ArrayDeque<>(); @@ -127,7 +143,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Map replicationCallbacks = new HashMap<>(); private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; - private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final Collection> metadata; private final DataTree dataTree; private final String logContext; @@ -143,31 +158,30 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private DataTreeTip tip; private SchemaContext schemaContext; + private DataSchemaContextTree dataSchemaContext; private int currentTransactionBatch; - ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree, + ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final DataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, - final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, + final String logContext, final ShardDataTreeMetadata... metadata) { - this.dataTree = Preconditions.checkNotNull(dataTree); + this.dataTree = requireNonNull(dataTree); updateSchemaContext(schemaContext); - this.shard = Preconditions.checkNotNull(shard); - this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher); - this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher); - this.logContext = Preconditions.checkNotNull(logContext); + this.shard = requireNonNull(shard); + this.treeChangeListenerPublisher = requireNonNull(treeChangeListenerPublisher); + this.logContext = requireNonNull(logContext); this.metadata = ImmutableList.copyOf(metadata); tip = dataTree; } - ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, + ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType, final YangInstanceIdentifier root, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, - final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, + final String logContext, final ShardDataTreeMetadata... metadata) { - this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher, - dataChangeListenerPublisher, logContext, metadata); + this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher, logContext, metadata); } private static DataTree createDataTree(final TreeType treeType, final YangInstanceIdentifier root) { @@ -180,10 +194,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @VisibleForTesting - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { - this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(""), - new DefaultShardDataChangeListenerPublisher(""), ""); + public ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType) { + this(shard, schemaContext, treeType, YangInstanceIdentifier.empty(), + new DefaultShardDataTreeChangeListenerPublisher(""), ""); } final String logContext() { @@ -202,9 +215,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return schemaContext; } - void updateSchemaContext(final SchemaContext newSchemaContext) { - dataTree.setSchemaContext(newSchemaContext); - this.schemaContext = Preconditions.checkNotNull(newSchemaContext); + void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) { + dataTree.setEffectiveModelContext(newSchemaContext); + this.schemaContext = newSchemaContext; + this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); } void resetTransactionBatch() { @@ -216,8 +230,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * * @return A state snapshot */ - @Nonnull ShardDataTreeSnapshot takeStateSnapshot() { - final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get(); + @NonNull ShardDataTreeSnapshot takeStateSnapshot() { + final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get(); final Builder>, ShardDataTreeSnapshotMetadata> metaBuilder = ImmutableMap.builder(); @@ -235,7 +249,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return !pendingTransactions.isEmpty() || !pendingCommits.isEmpty() || !pendingFinishCommits.isEmpty(); } - private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot, + private void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot, final UnaryOperator wrapper) throws DataValidationFailedException { final Stopwatch elapsed = Stopwatch.createStarted(); @@ -259,18 +273,18 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification()); + final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification(); + final DataTreeModification mod = wrapper.apply(unwrapped); // delete everything first - mod.delete(YangInstanceIdentifier.EMPTY); + mod.delete(YangInstanceIdentifier.empty()); - final java.util.Optional> maybeNode = snapshot.getRootNode(); + final Optional> maybeNode = snapshot.getRootNode(); if (maybeNode.isPresent()) { // Add everything from the remote node back - mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get()); + mod.write(YangInstanceIdentifier.empty(), maybeNode.get()); } mod.ready(); - final DataTreeModification unwrapped = unwrap(mod); dataTree.validate(unwrapped); DataTreeCandidateTip candidate = dataTree.prepare(unwrapped); dataTree.commit(candidate); @@ -286,21 +300,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation applySnapshot(snapshot, UnaryOperator.identity()); } - private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) { - return new PruningDataTreeModification(delegate, dataTree, schemaContext); - } - - private static DataTreeModification unwrap(final DataTreeModification modification) { - if (modification instanceof PruningDataTreeModification) { - return ((PruningDataTreeModification)modification).delegate(); - } - return modification; - } - /** * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data * pruning in an attempt to adjust the state to our current SchemaContext. @@ -308,17 +312,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoverySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { - applySnapshot(snapshot, this::wrapWithPruning); + void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException { + // TODO: we should be able to reuse the pruner, provided we are not reentrant + final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext( + dataSchemaContext); + if (snapshot.needsMigration()) { + final ReusableNormalizedNodePruner uintPruner = pruner.withUintAdaption(); + applySnapshot(snapshot.getSnapshot(), + delegate -> new PruningDataTreeModification.Proactive(delegate, dataTree, uintPruner)); + } else { + applySnapshot(snapshot.getSnapshot(), + delegate -> new PruningDataTreeModification.Reactive(delegate, dataTree, pruner)); + } } @SuppressWarnings("checkstyle:IllegalCatch") - private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException { - final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification()); - DataTreeCandidates.applyToModification(mod, candidate); - mod.ready(); + private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException { + final Entry entry = payload.acquireCandidate(); + final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification(); + final PruningDataTreeModification mod = createPruningModification(unwrapped, + NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0); - final DataTreeModification unwrapped = mod.delegate(); + DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate()); + mod.ready(); LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped); try { @@ -332,6 +348,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { "%s: Failed to apply recovery payload. Modification data was written to file %s", logContext, file), e); } + + allMetadataCommittedTransaction(entry.getKey()); + } + + private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped, + final boolean uintAdapting) { + // TODO: we should be able to reuse the pruner, provided we are not reentrant + final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext( + dataSchemaContext); + return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption()) + : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner); } /** @@ -342,12 +369,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws IOException when the snapshot fails to deserialize * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException, DataValidationFailedException { + void applyRecoveryPayload(final @NonNull Payload payload) throws IOException { if (payload instanceof CommitTransactionPayload) { - final Entry e = - ((CommitTransactionPayload) payload).getCandidate(); - applyRecoveryCandidate(e.getValue()); - allMetadataCommittedTransaction(e.getKey()); + applyRecoveryCandidate((CommitTransactionPayload) payload); } else if (payload instanceof AbortTransactionPayload) { allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); } else if (payload instanceof PurgeTransactionPayload) { @@ -363,12 +387,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign) - throws DataValidationFailedException { + private void applyReplicatedCandidate(final CommitTransactionPayload payload) + throws DataValidationFailedException, IOException { + final Entry entry = payload.acquireCandidate(); + final TransactionIdentifier identifier = entry.getKey(); LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); final DataTreeModification mod = dataTree.takeSnapshot().newModification(); - DataTreeCandidates.applyToModification(mod, foreign); + // TODO: check version here, which will enable us to perform forward-compatibility transformations + DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate()); mod.ready(); LOG.trace("{}: Applying foreign modification {}", logContext, mod); @@ -376,6 +403,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeCandidate candidate = dataTree.prepare(mod); dataTree.commit(candidate); + allMetadataCommittedTransaction(identifier); notifyListeners(candidate); } @@ -402,18 +430,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * pre-Boron state -- which limits the number of options here. */ if (payload instanceof CommitTransactionPayload) { - final TransactionIdentifier txId; if (identifier == null) { - final Entry e = - ((CommitTransactionPayload) payload).getCandidate(); - txId = e.getKey(); - applyReplicatedCandidate(txId, e.getValue()); + applyReplicatedCandidate((CommitTransactionPayload) payload); } else { - Verify.verify(identifier instanceof TransactionIdentifier); - txId = (TransactionIdentifier) identifier; - payloadReplicationComplete(txId); + verify(identifier instanceof TransactionIdentifier); + // if we did not track this transaction before, it means that it came from another leader and we are in + // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to + // the local DataTree and would be lost if it was only applied via payloadReplicationComplete(). + if (!payloadReplicationComplete((TransactionIdentifier) identifier)) { + applyReplicatedCandidate((CommitTransactionPayload) payload); + } } - allMetadataCommittedTransaction(txId); + + // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed. + checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue() + .getCandidate()); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); @@ -444,7 +475,30 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) { + private void checkRootOverwrite(DataTreeCandidate candidate) { + final DatastoreContext datastoreContext = shard.getDatastoreContext(); + if (!datastoreContext.isSnapshotOnRootOverwrite()) { + return; + } + + if (!datastoreContext.isPersistent()) { + return; + } + + if (candidate.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) { + return; + } + + // top level container ie "/" + if ((candidate.getRootPath().equals(YangInstanceIdentifier.empty()) + && candidate.getRootNode().getModificationType().equals(ModificationType.WRITE))) { + LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext); + shard.self().tell(new InitiateCaptureSnapshot(), noSender()); + return; + } + } + + private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) { if (callback != null) { replicationCallbacks.put(payload, callback); } @@ -461,20 +515,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void payloadReplicationComplete(final TransactionIdentifier txId) { + private boolean payloadReplicationComplete(final TransactionIdentifier txId) { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); - return; + allMetadataCommittedTransaction(txId); + return false; } if (!current.cohort.getIdentifier().equals(txId)) { LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); - return; + allMetadataCommittedTransaction(txId); + return false; } finishCommit(current.cohort); + return true; } private void allMetadataAbortedTransaction(final TransactionIdentifier txId) { @@ -525,18 +582,18 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final boolean closed) { final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this); final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret); - Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, - existing); + checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, existing); return ret; } ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId, - @Nullable final Runnable callback) { + final @Nullable Runnable callback) { ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { chain = new ShardDataTreeTransactionChain(historyId, this); transactionChains.put(historyId, chain); - replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback); + replicatePayload(historyId, CreateLocalHistoryPayload.create( + historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } else if (callback != null) { callback.run(); } @@ -545,6 +602,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { + shard.getShardMBean().incrementReadOnlyTransactionCount(); + if (txId.getHistoryId().getHistoryId() == 0) { return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot()); } @@ -553,6 +612,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { + shard.getShardMBean().incrementReadWriteTransactionCount(); + if (txId.getHistoryId().getHistoryId() == 0) { return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot() .newModification()); @@ -564,7 +625,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public void notifyListeners(final DataTreeCandidate candidate) { treeChangeListenerPublisher.publishChanges(candidate); - dataChangeListenerPublisher.publishChanges(candidate); } /** @@ -586,18 +646,34 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param id History identifier * @param callback Callback to invoke upon completion, may be null */ - void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { + if (commonCloseTransactionChain(id, callback)) { + replicatePayload(id, CloseLocalHistoryPayload.create(id, + shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); + } + } + + /** + * Close a single transaction chain which is received through ask-based protocol. It does not keep a commit record. + * + * @param id History identifier + */ + void closeTransactionChain(final LocalHistoryIdentifier id) { + commonCloseTransactionChain(id, null); + } + + private boolean commonCloseTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { final ShardDataTreeTransactionChain chain = transactionChains.get(id); if (chain == null) { LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id); if (callback != null) { callback.run(); } - return; + return false; } chain.close(); - replicatePayload(id, CloseLocalHistoryPayload.create(id), callback); + return true; } /** @@ -606,7 +682,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param id History identifier * @param callback Callback to invoke upon completion, may be null */ - void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { final ShardDataTreeTransactionChain chain = transactionChains.remove(id); if (chain == null) { LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id); @@ -616,22 +692,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } - replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); - } - - void registerDataChangeListener(final YangInstanceIdentifier path, - final AsyncDataChangeListener> listener, - final DataChangeScope scope, final Optional initialState, - final Consumer>>> - onRegistration) { - dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration); + replicatePayload(id, PurgeLocalHistoryPayload.create( + id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } Optional readCurrentData() { - final java.util.Optional> currentState = - dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); - return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( - YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); + return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()) + .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state)); } public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, @@ -648,7 +715,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { final TransactionIdentifier id = transaction.getIdentifier(); LOG.debug("{}: aborting transaction {}", logContext, id); - replicatePayload(id, AbortTransactionPayload.create(id), callback); + replicatePayload(id, AbortTransactionPayload.create( + id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } @Override @@ -658,20 +726,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction, + final Optional> participatingShardNames) { final DataTreeModification snapshot = transaction.getSnapshot(); + final TransactionIdentifier id = transaction.getIdentifier(); + LOG.debug("{}: readying transaction {}", logContext, id); snapshot.ready(); + LOG.debug("{}: transaction {} ready", logContext, id); - return createReadyCohort(transaction.getIdentifier(), snapshot); + return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames); } void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { LOG.debug("{}: purging transaction {}", logContext, id); - replicatePayload(id, PurgeTransactionPayload.create(id), callback); + replicatePayload(id, PurgeTransactionPayload.create( + id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } public Optional> readNode(final YangInstanceIdentifier path) { - return Optional.fromJavaUtil(dataTree.takeSnapshot().readNode(path)); + return dataTree.takeSnapshot().readNode(path); } DataTreeSnapshot takeSnapshot() { @@ -744,8 +817,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // For debugging purposes, allow dumping of the modification. Coupled with the above // precondition log, it should allow us to understand what went on. - LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, - dataTree); + LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.getIdentifier(), modification); + LOG.trace("{}: Current tree: {}", logContext, dataTree); cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e); } catch (Exception e) { LOG.warn("{}: Unexpected failure in validation phase", logContext, e); @@ -801,13 +874,107 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } if (!cohort.equals(head.cohort)) { - LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier()); - return; + // The tx isn't at the head of the queue so we can't start canCommit at this point. Here we check if this + // tx should be moved ahead of other tx's in the READY state in the pendingTransactions queue. If this tx + // has other participating shards, it could deadlock with other tx's accessing the same shards + // depending on the order the tx's are readied on each shard + // (see https://jira.opendaylight.org/browse/CONTROLLER-1836). Therefore, if the preceding participating + // shard names for a preceding pending tx, call it A, in the queue matches that of this tx, then this tx + // is allowed to be moved ahead of tx A in the queue so it is processed first to avoid potential deadlock + // if tx A is behind this tx in the pendingTransactions queue for a preceding shard. In other words, since + // canCommmit for this tx was requested before tx A, honor that request. If this tx is moved to the head of + // the queue as a result, then proceed with canCommit. + + Collection precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames()); + if (precedingShardNames.isEmpty()) { + LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier()); + return; + } + + LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}", + logContext, cohort.getIdentifier(), precedingShardNames); + final Iterator iter = pendingTransactions.iterator(); + int index = -1; + int moveToIndex = -1; + while (iter.hasNext()) { + final CommitEntry entry = iter.next(); + ++index; + + if (cohort.equals(entry.cohort)) { + if (moveToIndex < 0) { + LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit", + logContext, cohort.getIdentifier()); + return; + } + + LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue", + logContext, cohort.getIdentifier(), moveToIndex); + iter.remove(); + insertEntry(pendingTransactions, entry, moveToIndex); + + if (!cohort.equals(pendingTransactions.peek().cohort)) { + LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit", + logContext, cohort.getIdentifier()); + return; + } + + LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit", + logContext, cohort.getIdentifier()); + break; + } + + if (entry.cohort.getState() != State.READY) { + LOG.debug("{}: Skipping pending transaction {} in state {}", + logContext, entry.cohort.getIdentifier(), entry.cohort.getState()); + continue; + } + + final Collection pendingPrecedingShardNames = extractPrecedingShardNames( + entry.cohort.getParticipatingShardNames()); + + if (precedingShardNames.equals(pendingPrecedingShardNames)) { + if (moveToIndex < 0) { + LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}", + logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index); + moveToIndex = index; + } else { + LOG.debug( + "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}", + logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex); + } + } else { + LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping", + logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier()); + } + } } processNextPendingTransaction(); } + private static void insertEntry(final Deque queue, final CommitEntry entry, final int atIndex) { + if (atIndex == 0) { + queue.addFirst(entry); + return; + } + + LOG.trace("Inserting into Deque at index {}", atIndex); + + Deque tempStack = new ArrayDeque<>(atIndex); + for (int i = 0; i < atIndex; i++) { + tempStack.push(queue.poll()); + } + + queue.addFirst(entry); + + tempStack.forEach(queue::addFirst); + } + + private Collection extractPrecedingShardNames(final Optional> participatingShardNames) { + return participatingShardNames.map((Function, Collection>) + set -> set.headSet(shard.getShardName())).orElse(Collections.emptyList()); + } + private void failPreCommit(final Throwable cause) { shard.getShardMBean().incrementFailedTransactionsCount(); pendingTransactions.poll().cohort.failedPreCommit(cause); @@ -817,17 +984,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @SuppressWarnings("checkstyle:IllegalCatch") void startPreCommit(final SimpleShardDataTreeCohort cohort) { final CommitEntry entry = pendingTransactions.peek(); - Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort); + checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort); final SimpleShardDataTreeCohort current = entry.cohort; - Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); + verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); - LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier()); + final TransactionIdentifier currentId = current.getIdentifier(); + LOG.debug("{}: Preparing transaction {}", logContext, currentId); final DataTreeCandidateTip candidate; try { candidate = tip.prepare(cohort.getDataTreeModification()); - } catch (RuntimeException e) { + LOG.debug("{}: Transaction {} candidate ready", logContext, currentId); + } catch (DataValidationFailedException | RuntimeException e) { failPreCommit(e); return; } @@ -836,14 +1005,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override public void onSuccess(final Void noop) { // Set the tip of the data tree. - tip = Verify.verifyNotNull(candidate); + tip = verifyNotNull(candidate); entry.lastAccess = readTime(); pendingTransactions.remove(); pendingCommits.add(entry); - LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); + LOG.debug("{}: Transaction {} prepared", logContext, currentId); cohort.successfulPreCommit(candidate); @@ -883,6 +1052,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } + allMetadataCommittedTransaction(txId); shard.getShardMBean().incrementCommittedTransactionCount(); shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -897,7 +1067,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { final CommitEntry entry = pendingCommits.peek(); - Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort); + checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort); final SimpleShardDataTreeCohort current = entry.cohort; if (!cohort.equals(current)) { @@ -910,7 +1080,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final TransactionIdentifier txId = cohort.getIdentifier(); final Payload payload; try { - payload = CommitTransactionPayload.create(txId, candidate); + payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(), + shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()); } catch (IOException e) { LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e); pendingCommits.poll().cohort.failedCommit(e); @@ -965,27 +1136,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final Optional> participatingShardNames) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, - cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable), - COMMIT_STEP_TIMEOUT)); + cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf, + COMMIT_STEP_TIMEOUT), participatingShardNames); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics // the newReadWriteTransaction() - ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final Optional> participatingShardNames) { if (txId.getHistoryId().getHistoryId() == 0) { - return createReadyCohort(txId, mod); + return createReadyCohort(txId, mod, participatingShardNames); } - return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod); + return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames); } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis, - final Function> accessTimeUpdater) { + final Function accessTimeUpdater) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = readTime(); @@ -1003,9 +1176,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } - final Optional updateOpt = accessTimeUpdater.apply(currentTx.cohort); + final OptionalLong updateOpt = accessTimeUpdater.apply(currentTx.cohort); if (updateOpt.isPresent()) { - final long newAccess = updateOpt.get().longValue(); + final long newAccess = updateOpt.getAsLong(); final long newDelta = now - newAccess; if (newDelta < delta) { LOG.debug("{}: Updated current transaction {} access time", logContext, @@ -1133,8 +1306,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void rebaseTransactions(final Iterator iter, @Nonnull final DataTreeTip newTip) { - tip = Preconditions.checkNotNull(newTip); + private void rebaseTransactions(final Iterator iter, final @NonNull DataTreeTip newTip) { + tip = requireNonNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort; if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {