X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=246f81272a014772071cb2a595ca7bce02315e48;hb=4447f81c26b851e46acd3f111768bb498f0d553f;hp=be4aa42669ff77fb653f59082f664dc85deaf7eb;hpb=a21acd04ac162c1eb84a997ae0c9ac9df185422c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index be4aa42669..246f81272a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -24,7 +24,6 @@ import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -64,7 +63,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -166,7 +164,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) { this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY, - new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), ""); + new DefaultShardDataTreeChangeListenerPublisher(""), + new DefaultShardDataChangeListenerPublisher(""), ""); } final String logContext() { @@ -543,27 +542,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @VisibleForTesting public void notifyListeners(final DataTreeCandidate candidate) { - treeChangeListenerPublisher.publishChanges(candidate, logContext); - dataChangeListenerPublisher.publishChanges(candidate, logContext); - } - - void notifyOfInitialData(final DataChangeListenerRegistration>> listenerReg, final Optional currentState) { - if (currentState.isPresent()) { - ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance(); - localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(), - listenerReg.getScope()); - localPublisher.publishChanges(currentState.get(), logContext); - } - } - - void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, - final Optional currentState) { - if (currentState.isPresent()) { - ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance(); - localPublisher.registerTreeChangeListener(path, listener); - localPublisher.publishChanges(currentState.get(), logContext); - } + treeChangeListenerPublisher.publishChanges(candidate); + dataChangeListenerPublisher.publishChanges(candidate); } /** @@ -618,29 +598,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } - Entry>>, - Optional> registerChangeListener(final YangInstanceIdentifier path, - final AsyncDataChangeListener> listener, - final DataChangeScope scope) { - DataChangeListenerRegistration>> reg = - dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope); - - return new SimpleEntry<>(reg, readCurrentData()); + void registerDataChangeListener(final YangInstanceIdentifier path, + final AsyncDataChangeListener> listener, + final DataChangeScope scope, final Optional initialState, + final Consumer>>> + onRegistration) { + dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration); } - private Optional readCurrentData() { + Optional readCurrentData() { final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); } - public Entry, Optional> - registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { - final ListenerRegistration reg = - treeChangeListenerPublisher.registerTreeChangeListener(path, listener); - - return new SimpleEntry<>(reg, readCurrentData()); + public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, + final Optional initialState, + final Consumer> onRegistration) { + treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration); } int getQueueSize() { @@ -951,7 +927,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { cohortRegistry.process(sender, message); } - @Override ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, final Exception failure) { @@ -968,6 +943,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return cohort; } + // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics + // the newReadWriteTransaction() + ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + if (txId.getHistoryId().getHistoryId() == 0) { + return createReadyCohort(txId, mod); + } + + return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod); + } + @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);