X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=c829c035da478ffef3032156b317c8f0e4363715;hb=4b13f57a2f095a5e837d5d919a52d867ded8d373;hp=c37703c3359b503bf1e65cab8e46d49fdd9b5353;hpb=634dfac8eead60f443bf75e749c70d1f2bb29198;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index c37703c335..c829c035da 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -40,9 +40,8 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; @@ -80,20 +79,19 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; /** - * Internal shard state, similar to a DOMStore, but optimized for use in the actor system, - * e.g. it does not expose public interfaces and assumes it is only ever called from a - * single thread. + * Internal shard state, similar to a DOMStore, but optimized for use in the actor system, e.g. it does not expose + * public interfaces and assumes it is only ever called from a single thread. * *

- * This class is not part of the API contract and is subject to change at any time. + * This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe. */ -@NotThreadSafe public class ShardDataTree extends ShardDataTreeTransactionParent { private static final class CommitEntry { final SimpleShardDataTreeCohort cohort; @@ -110,7 +108,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(FiniteDuration.create(5, TimeUnit.SECONDS)); private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); /** @@ -148,6 +146,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private DataTreeTip tip; private SchemaContext schemaContext; + private DataSchemaContextTree dataSchemaContext; private int currentTransactionBatch; @@ -207,6 +206,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { void updateSchemaContext(final SchemaContext newSchemaContext) { dataTree.setSchemaContext(newSchemaContext); this.schemaContext = Preconditions.checkNotNull(newSchemaContext); + this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext); } void resetTransactionBatch() { @@ -218,7 +218,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * * @return A state snapshot */ - @Nonnull ShardDataTreeSnapshot takeStateSnapshot() { + @NonNull ShardDataTreeSnapshot takeStateSnapshot() { final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get(); final Builder>, ShardDataTreeSnapshotMetadata> metaBuilder = ImmutableMap.builder(); @@ -237,7 +237,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return !pendingTransactions.isEmpty() || !pendingCommits.isEmpty() || !pendingFinishCommits.isEmpty(); } - private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot, + private void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot, final UnaryOperator wrapper) throws DataValidationFailedException { final Stopwatch elapsed = Stopwatch.createStarted(); @@ -288,12 +288,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { applySnapshot(snapshot, UnaryOperator.identity()); } private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) { - return new PruningDataTreeModification(delegate, dataTree, schemaContext); + return new PruningDataTreeModification(delegate, dataTree, dataSchemaContext); } private static DataTreeModification unwrap(final DataTreeModification modification) { @@ -310,7 +310,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param snapshot Snapshot that needs to be applied * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoverySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + void applyRecoverySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { applySnapshot(snapshot, this::wrapWithPruning); } @@ -344,7 +344,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @throws IOException when the snapshot fails to deserialize * @throws DataValidationFailedException when the snapshot fails to apply */ - void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException { + void applyRecoveryPayload(final @NonNull Payload payload) throws IOException { if (payload instanceof CommitTransactionPayload) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); @@ -365,7 +365,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign) + private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); @@ -378,6 +378,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeCandidate candidate = dataTree.prepare(mod); dataTree.commit(candidate); + allMetadataCommittedTransaction(identifier); notifyListeners(candidate); } @@ -404,18 +405,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * pre-Boron state -- which limits the number of options here. */ if (payload instanceof CommitTransactionPayload) { - final TransactionIdentifier txId; if (identifier == null) { final Entry e = ((CommitTransactionPayload) payload).getCandidate(); - txId = e.getKey(); - applyReplicatedCandidate(txId, e.getValue()); + applyReplicatedCandidate(e.getKey(), e.getValue()); } else { Verify.verify(identifier instanceof TransactionIdentifier); - txId = (TransactionIdentifier) identifier; - payloadReplicationComplete(txId); + payloadReplicationComplete((TransactionIdentifier) identifier); } - allMetadataCommittedTransaction(txId); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); @@ -446,7 +443,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) { + private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) { if (callback != null) { replicationCallbacks.put(payload, callback); } @@ -467,12 +464,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); + allMetadataCommittedTransaction(txId); return; } if (!current.cohort.getIdentifier().equals(txId)) { LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); + allMetadataCommittedTransaction(txId); return; } @@ -533,12 +532,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId, - @Nullable final Runnable callback) { + final @Nullable Runnable callback) { ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { chain = new ShardDataTreeTransactionChain(historyId, this); transactionChains.put(historyId, chain); - replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback); + replicatePayload(historyId, CreateLocalHistoryPayload.create( + historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } else if (callback != null) { callback.run(); } @@ -547,6 +547,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { + shard.getShardMBean().incrementReadOnlyTransactionCount(); + if (txId.getHistoryId().getHistoryId() == 0) { return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot()); } @@ -555,6 +557,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) { + shard.getShardMBean().incrementReadWriteTransactionCount(); + if (txId.getHistoryId().getHistoryId() == 0) { return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot() .newModification()); @@ -587,18 +591,34 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param id History identifier * @param callback Callback to invoke upon completion, may be null */ - void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { + if (commonCloseTransactionChain(id, callback)) { + replicatePayload(id, CloseLocalHistoryPayload.create(id, + shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); + } + } + + /** + * Close a single transaction chain which is received through ask-based protocol. It does not keep a commit record. + * + * @param id History identifier + */ + void closeTransactionChain(final LocalHistoryIdentifier id) { + commonCloseTransactionChain(id, null); + } + + private boolean commonCloseTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { final ShardDataTreeTransactionChain chain = transactionChains.get(id); if (chain == null) { LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id); if (callback != null) { callback.run(); } - return; + return false; } chain.close(); - replicatePayload(id, CloseLocalHistoryPayload.create(id), callback); + return true; } /** @@ -607,7 +627,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { * @param id History identifier * @param callback Callback to invoke upon completion, may be null */ - void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) { final ShardDataTreeTransactionChain chain = transactionChains.remove(id); if (chain == null) { LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id); @@ -617,7 +637,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } - replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); + replicatePayload(id, PurgeLocalHistoryPayload.create( + id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } Optional readCurrentData() { @@ -641,7 +662,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { final TransactionIdentifier id = transaction.getIdentifier(); LOG.debug("{}: aborting transaction {}", logContext, id); - replicatePayload(id, AbortTransactionPayload.create(id), callback); + replicatePayload(id, AbortTransactionPayload.create( + id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } @Override @@ -661,7 +683,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { LOG.debug("{}: purging transaction {}", logContext, id); - replicatePayload(id, PurgeTransactionPayload.create(id), callback); + replicatePayload(id, PurgeTransactionPayload.create( + id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } public Optional> readNode(final YangInstanceIdentifier path) { @@ -873,7 +896,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingTransaction(); } - private void insertEntry(final Deque queue, final CommitEntry entry, final int atIndex) { + private static void insertEntry(final Deque queue, final CommitEntry entry, final int atIndex) { if (atIndex == 0) { queue.addFirst(entry); return; @@ -972,6 +995,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return; } + allMetadataCommittedTransaction(txId); shard.getShardMBean().incrementCommittedTransactionCount(); shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -999,7 +1023,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final TransactionIdentifier txId = cohort.getIdentifier(); final Payload payload; try { - payload = CommitTransactionPayload.create(txId, candidate); + payload = CommitTransactionPayload.create(txId, candidate, + shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()); } catch (IOException e) { LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e); pendingCommits.poll().cohort.failedCommit(e); @@ -1224,7 +1249,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void rebaseTransactions(final Iterator iter, @Nonnull final DataTreeTip newTip) { + private void rebaseTransactions(final Iterator iter, final @NonNull DataTreeTip newTip) { tip = Preconditions.checkNotNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort;