X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=613f9adbc9355c4cabb8a776f31069bdfba6660c;hb=refs%2Fchanges%2F26%2F39426%2F73;hp=78b49a60ae5cef444b28633fd9b8fe397d117914;hpb=823bd74f34ee1c651f1f90daeef386a35c68d431;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 78b49a60ae..613f9adbc9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -39,13 +39,18 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; +import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; +import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; @@ -69,7 +74,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; @@ -103,10 +107,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); private final Map transactionChains = new HashMap<>(); + private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry(); private final Queue pendingTransactions = new ArrayDeque<>(); private final Queue pendingCommits = new ArrayDeque<>(); private final Queue pendingFinishCommits = new ArrayDeque<>(); + + /** + * Callbacks that need to be invoked once a payload is replicated. + */ + private final Map replicationCallbacks = new HashMap<>(); + private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final Collection> metadata; @@ -151,8 +162,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(), - new DefaultShardDataChangeListenerPublisher(), ""); + new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), ""); } final String logContext() { @@ -313,6 +323,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ((CommitTransactionPayload) payload).getCandidate(); applyRecoveryCandidate(e.getValue()); allMetadataCommittedTransaction(e.getKey()); + } else if (payload instanceof CreateLocalHistoryPayload) { + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof CloseLocalHistoryPayload) { + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } else if (payload instanceof PurgeLocalHistoryPayload) { + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof DataTreeCandidatePayload) { applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate()); } else { @@ -368,11 +384,46 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Verify.verify(identifier instanceof TransactionIdentifier); payloadReplicationComplete((TransactionIdentifier) identifier); } + } else if (payload instanceof CloseLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CloseLocalHistoryPayload) payload); + } else { + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } + } else if (payload instanceof CreateLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CreateLocalHistoryPayload)payload); + } else { + allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); + } + } else if (payload instanceof PurgeLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((PurgeLocalHistoryPayload)payload); + } else { + allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); + } } else { LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); } } + private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) { + if (callback != null) { + replicationCallbacks.put(payload, callback); + } + shard.persistPayload(id, payload, true); + } + + private void payloadReplicationComplete(final AbstractIdentifiablePayload payload) { + final Runnable callback = replicationCallbacks.remove(payload); + if (callback != null) { + LOG.debug("{}: replication of {} completed, invoking {}", logContext, payload.getIdentifier(), callback); + callback.run(); + } else { + LOG.debug("{}: replication of {} has no callback", logContext, payload.getIdentifier()); + } + } + private void payloadReplicationComplete(final TransactionIdentifier txId) { final CommitEntry current = pendingFinishCommits.peek(); if (current == null) { @@ -395,11 +446,30 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { - ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier); + private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryCreated(historyId); + } + } + + private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryClosed(historyId); + } + } + + private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) { + for (ShardDataTreeMetadata m : metadata) { + m.onHistoryPurged(historyId); + } + } + + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) { + ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { - chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this); - transactionChains.put(localHistoryIdentifier, chain); + chain = new ShardDataTreeTransactionChain(historyId, this); + transactionChains.put(historyId, chain); + shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true); } return chain; @@ -447,6 +517,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + /** + * Immediately close all transaction chains. + */ void closeAllTransactionChains() { for (ShardDataTreeTransactionChain chain : transactionChains.values()) { chain.close(); @@ -455,13 +528,43 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { transactionChains.clear(); } - void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) { - final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId); - if (chain != null) { - chain.close(); - } else { - LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId); + /** + * Close a single transaction chain. + * + * @param id History identifier + * @param callback Callback to invoke upon completion, may be null + */ + void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.get(id); + if (chain == null) { + LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; } + + chain.close(); + replicatePayload(id, CloseLocalHistoryPayload.create(id), callback); + } + + /** + * Purge a single transaction chain. + * + * @param id History identifier + * @param callback Callback to invoke upon completion, may be null + */ + void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.remove(id); + if (chain == null) { + LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; + } + + replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } Entry>>, @@ -571,7 +674,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); - entry.lastAccess = shard.ticker().read(); + entry.lastAccess = ticker().read(); return; } catch (ConflictingModificationAppliedException e) { LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), @@ -597,12 +700,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } private void processNextPending() { - processNextPendingFinishCommit(); processNextPendingCommit(); processNextPendingTransaction(); } - private void processNextPending(Queue queue, State allowedState, Consumer processor) { + private void processNextPending(final Queue queue, final State allowedState, + final Consumer processor) { while (!queue.isEmpty()) { final CommitEntry entry = queue.peek(); final SimpleShardDataTreeCohort cohort = entry.cohort; @@ -628,11 +731,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { entry -> startCommit(entry.cohort, entry.cohort.getCandidate())); } - private void processNextPendingFinishCommit() { - processNextPending(pendingFinishCommits, State.FINISH_COMMIT_PENDING, - entry -> payloadReplicationComplete(entry.cohort.getIdentifier())); - } - private boolean peekNextPendingCommit() { final CommitEntry first = pendingCommits.peek(); return first != null && first.cohort.getState() == State.COMMIT_PENDING; @@ -676,7 +774,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Set the tip of the data tree. tip = Verify.verifyNotNull(candidate); - entry.lastAccess = shard.ticker().read(); + entry.lastAccess = ticker().read(); pendingTransactions.remove(); pendingCommits.add(entry); @@ -739,15 +837,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier()); final TransactionIdentifier txId = cohort.getIdentifier(); - if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) { - LOG.debug("{}: No replication required, proceeding to finish commit", logContext); - pendingCommits.remove(); - pendingFinishCommits.add(entry); - cohort.finishCommitPending(); - payloadReplicationComplete(txId); - return; - } - final Payload payload; try { payload = CommitTransactionPayload.create(txId, candidate); @@ -788,6 +877,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingCommit(); } + Collection getCohortActors() { + return cohortRegistry.getCohortActors(); + } + void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { cohortRegistry.process(sender, message); } @@ -797,14 +890,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeModification modification) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); - pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read())); + pendingTransactions.add(new CommitEntry(cohort, ticker().read())); return cohort; } @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); - final long now = shard.ticker().read(); + final long now = ticker().read(); final Queue currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits : !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions; @@ -916,7 +1009,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @SuppressWarnings("checkstyle:IllegalCatch") - private void rebaseTransactions(Iterator iter, @Nonnull TipProducingDataTreeTip newTip) { + private void rebaseTransactions(final Iterator iter, @Nonnull final TipProducingDataTreeTip newTip) { tip = Preconditions.checkNotNull(newTip); while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort;