X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=20cee65c37ca50e62baf09cd3c4db25539242f8c;hp=613f9adbc9355c4cabb8a776f31069bdfba6660c;hb=4a9e1103eef316f18c4a14cfccb050bddc01564b;hpb=de64c6bbf2d5aeb51f4036f9dd606a9bf6f71afb diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 613f9adbc9..20cee65c37 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -45,12 +45,14 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifiablePayload; import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload; +import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; @@ -136,7 +138,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private SchemaContext schemaContext; - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, final ShardDataTreeMetadata... metadata) { @@ -151,12 +153,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { tip = dataTree; } - public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, + ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, final YangInstanceIdentifier root, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, - final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) { + final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, + final ShardDataTreeMetadata... metadata) { this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root), - treeChangeListenerPublisher, dataChangeListenerPublisher, logContext); + treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata); } @VisibleForTesting @@ -323,6 +326,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ((CommitTransactionPayload) payload).getCandidate(); applyRecoveryCandidate(e.getValue()); allMetadataCommittedTransaction(e.getKey()); + } else if (payload instanceof AbortTransactionPayload) { + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); + } else if (payload instanceof PurgeTransactionPayload) { + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); } else if (payload instanceof CreateLocalHistoryPayload) { allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof CloseLocalHistoryPayload) { @@ -384,6 +391,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Verify.verify(identifier instanceof TransactionIdentifier); payloadReplicationComplete((TransactionIdentifier) identifier); } + } else if (payload instanceof AbortTransactionPayload) { + if (identifier != null) { + payloadReplicationComplete((AbortTransactionPayload) payload); + } else { + allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier()); + } + } else if (payload instanceof PurgeTransactionPayload) { + if (identifier != null) { + payloadReplicationComplete((PurgeTransactionPayload) payload); + } else { + allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier()); + } + } else if (payload instanceof CloseLocalHistoryPayload) { + if (identifier != null) { + payloadReplicationComplete((CloseLocalHistoryPayload) payload); + } else { + allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); + } } else if (payload instanceof CloseLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CloseLocalHistoryPayload) payload); @@ -440,12 +465,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { finishCommit(current.cohort); } + private void allMetadataAbortedTransaction(final TransactionIdentifier txId) { + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionAborted(txId); + } + } + private void allMetadataCommittedTransaction(final TransactionIdentifier txId) { for (ShardDataTreeMetadata m : metadata) { m.onTransactionCommitted(txId); } } + private void allMetadataPurgedTransaction(final TransactionIdentifier txId) { + for (ShardDataTreeMetadata m : metadata) { + m.onTransactionPurged(txId); + } + } + private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) { for (ShardDataTreeMetadata m : metadata) { m.onHistoryCreated(historyId); @@ -464,6 +501,23 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + /** + * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)}, + * this method is used for re-establishing state when we are taking over + * + * @param historyId Local history identifier + * @param closed True if the chain should be created in closed state (i.e. pending purge) + * @return Transaction chain handle + */ + ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId, + final boolean closed) { + final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this); + final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret); + Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, + existing); + return ret; + } + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) { ShardDataTreeTransactionChain chain = transactionChains.get(historyId); if (chain == null) { @@ -477,7 +531,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { if (txId.getHistoryId().getHistoryId() == 0) { - return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot()); + return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot()); } return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId); @@ -518,14 +572,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } /** - * Immediately close all transaction chains. + * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled + * replication callbacks. */ - void closeAllTransactionChains() { + void purgeLeaderState() { for (ShardDataTreeTransactionChain chain : transactionChains.values()) { chain.close(); } transactionChains.clear(); + replicationCallbacks.clear(); } /** @@ -597,8 +653,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } @Override - void abortTransaction(final AbstractShardDataTreeTransaction transaction) { - // Intentional no-op + void abortTransaction(final AbstractShardDataTreeTransaction transaction, final Runnable callback) { + final TransactionIdentifier id = transaction.getIdentifier(); + LOG.debug("{}: aborting transaction {}", logContext, id); + replicatePayload(id, AbortTransactionPayload.create(id), callback); + } + + @Override + void abortFromTransactionActor(final AbstractShardDataTreeTransaction transaction) { + // No-op for free-standing transactions + } @Override @@ -609,6 +673,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return createReadyCohort(transaction.getIdentifier(), snapshot); } + void purgeTransaction(final TransactionIdentifier id, final Runnable callback) { + LOG.debug("{}: purging transaction {}", logContext, id); + replicatePayload(id, PurgeTransactionPayload.create(id), callback); + } + public Optional> readNode(final YangInstanceIdentifier path) { return dataTree.takeSnapshot().readNode(path); }