X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=1460982bf3d4010fa4558074033dd3df7b510033;hb=refs%2Fchanges%2F31%2F55931%2F1;hp=3b15b656b35f64fd0441da8a9c28489f2dc4a8dc;hpb=17a38939f6ba3cbbc1ff0f1f3e00b58f5002813d;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 3b15b656b3..1460982bf3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -24,7 +24,6 @@ import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -63,7 +62,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -336,8 +334,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } else if (payload instanceof PurgeLocalHistoryPayload) { allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier()); - } else if (payload instanceof DataTreeCandidatePayload) { - applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate()); } else { LOG.debug("{}: ignoring unhandled payload {}", logContext, payload); } @@ -409,12 +405,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } else { allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); } - } else if (payload instanceof CloseLocalHistoryPayload) { - if (identifier != null) { - payloadReplicationComplete((CloseLocalHistoryPayload) payload); - } else { - allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier()); - } } else if (payload instanceof CreateLocalHistoryPayload) { if (identifier != null) { payloadReplicationComplete((CreateLocalHistoryPayload)payload); @@ -552,25 +542,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { dataChangeListenerPublisher.publishChanges(candidate, logContext); } - void notifyOfInitialData(final DataChangeListenerRegistration>> listenerReg, final Optional currentState) { - if (currentState.isPresent()) { - ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance(); - localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(), - listenerReg.getScope()); - localPublisher.publishChanges(currentState.get(), logContext); - } - } - - void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, - final Optional currentState) { - if (currentState.isPresent()) { - ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance(); - localPublisher.registerTreeChangeListener(path, listener); - localPublisher.publishChanges(currentState.get(), logContext); - } - } - /** * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled * replication callbacks. @@ -623,29 +594,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } - Entry>>, - Optional> registerChangeListener(final YangInstanceIdentifier path, - final AsyncDataChangeListener> listener, - final DataChangeScope scope) { - DataChangeListenerRegistration>> reg = - dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope); - - return new SimpleEntry<>(reg, readCurrentData()); + void registerDataChangeListener(final YangInstanceIdentifier path, + final AsyncDataChangeListener> listener, + final DataChangeScope scope, final Optional initialState, + final Consumer>>> + onRegistration) { + dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration); } - private Optional readCurrentData() { + Optional readCurrentData() { final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); } - public Entry, Optional> - registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { - final ListenerRegistration reg = - treeChangeListenerPublisher.registerTreeChangeListener(path, listener); - - return new SimpleEntry<>(reg, readCurrentData()); + public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, + final Optional initialState, + final Consumer> onRegistration) { + treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration); } int getQueueSize() { @@ -659,6 +626,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { replicatePayload(id, AbortTransactionPayload.create(id), callback); } + @Override + void abortFromTransactionActor(final AbstractShardDataTreeTransaction transaction) { + // No-op for free-standing transactions + + } + @Override ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { final DataTreeModification snapshot = transaction.getSnapshot(); @@ -734,6 +707,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { + cohort.throwCanCommitFailure(); + tip.validate(modification); LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); cohort.successfulCanCommit(); @@ -948,15 +923,33 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { cohortRegistry.process(sender, message); } + @Override + ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod, + final Exception failure) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure); + pendingTransactions.add(new CommitEntry(cohort, ticker().read())); + return cohort; + } + @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, - final DataTreeModification modification) { - SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, + final DataTreeModification mod) { + SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId, cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); pendingTransactions.add(new CommitEntry(cohort, ticker().read())); return cohort; } + // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics + // the newReadWriteTransaction() + ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { + if (txId.getHistoryId().getHistoryId() == 0) { + return createReadyCohort(txId, mod); + } + + return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod); + } + @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.") void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);