Transaction chaining has mucked inside SnapshotBackedWriteTransaction to
get its state everytime a new transaction was allocated. Instead we
expose proper lifecycle hooks, so the chain handling logic can keep
track of state itself.
As an added feature, this fixes a bug, where a chain would become
unusable if the user closed a write transaction without actually calling
ready() on it.
Finally it fixes a synchronization bug, where a commit task would not be
properly synchronized with listener registrations.
Change-Id: Iebe4701f1712a95a9316ea8380fe12c8c5fe6b89
Signed-off-by: Robert Varga <rovarga@cisco.com>
package org.opendaylight.controller.md.sal.dom.store.impl;
import static com.google.common.base.Preconditions.checkState;
package org.opendaylight.controller.md.sal.dom.store.impl;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
* to implement {@link DOMStore} contract.
*
*/
* to implement {@link DOMStore} contract.
*
*/
-public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
- TransactionReadyPrototype,AutoCloseable {
+public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
"DataChangeListenerQueueMgr");
}
"DataChangeListenerQueueMgr");
}
- public void setCloseable(AutoCloseable closeable) {
+ public void setCloseable(final AutoCloseable closeable) {
this.closeable = closeable;
}
this.closeable = closeable;
}
- public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction writeTx) {
- LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
- return new ThreePhaseCommitImpl(writeTx);
+ protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+ LOG.debug("Tx: {} is closed.", tx.getIdentifier());
+ }
+
+ @Override
+ protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
+ LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), tree);
+ return new ThreePhaseCommitImpl(tx, tree);
}
private Object nextIdentifier() {
return name + "-" + txCounter.getAndIncrement();
}
}
private Object nextIdentifier() {
return name + "-" + txCounter.getAndIncrement();
}
- private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype {
-
+ private class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
+ @GuardedBy("this")
+ private SnapshotBackedWriteTransaction allocatedTransaction;
+ @GuardedBy("this")
+ private DataTreeSnapshot readySnapshot;
- private SnapshotBackedWriteTransaction latestOutstandingTx;
-
private boolean chainFailed = false;
private boolean chainFailed = false;
private void checkFailed() {
Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
}
private void checkFailed() {
Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
}
- @Override
- public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
- final DataTreeSnapshot snapshot;
+ @GuardedBy("this")
+ private DataTreeSnapshot getSnapshot() {
- if (latestOutstandingTx != null) {
- checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = latestOutstandingTx.getMutatedView();
+
+ if (allocatedTransaction != null) {
+ Preconditions.checkState(readySnapshot != null, "Previous transaction %s is not ready yet", allocatedTransaction.getIdentifier());
+ return readySnapshot;
- snapshot = dataTree.takeSnapshot();
+ return dataTree.takeSnapshot();
+ }
+
+ @GuardedBy("this")
+ private <T extends SnapshotBackedWriteTransaction> T recordTransaction(final T transaction) {
+ allocatedTransaction = transaction;
+ readySnapshot = null;
+ return transaction;
+ }
+
+ @Override
+ public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
+ final DataTreeSnapshot snapshot = getSnapshot();
return new SnapshotBackedReadTransaction(nextIdentifier(), getDebugTransactions(), snapshot);
}
@Override
public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new SnapshotBackedReadTransaction(nextIdentifier(), getDebugTransactions(), snapshot);
}
@Override
public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
- final DataTreeSnapshot snapshot;
- checkFailed();
- if (latestOutstandingTx != null) {
- checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = latestOutstandingTx.getMutatedView();
- } else {
- snapshot = dataTree.takeSnapshot();
- }
- final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
- getDebugTransactions(), snapshot, this);
- latestOutstandingTx = ret;
- return ret;
+ final DataTreeSnapshot snapshot = getSnapshot();
+ return recordTransaction(new SnapshotBackedReadWriteTransaction(nextIdentifier(),
+ getDebugTransactions(), snapshot, this));
}
@Override
public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
}
@Override
public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
- final DataTreeSnapshot snapshot;
- checkFailed();
- if (latestOutstandingTx != null) {
- checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = latestOutstandingTx.getMutatedView();
- } else {
- snapshot = dataTree.takeSnapshot();
+ final DataTreeSnapshot snapshot = getSnapshot();
+ return recordTransaction(new SnapshotBackedWriteTransaction(nextIdentifier(),
+ getDebugTransactions(), snapshot, this));
+ }
+
+ @Override
+ protected synchronized void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+ if (tx.equals(allocatedTransaction)) {
+ Preconditions.checkState(readySnapshot == null, "Unexpected abort of transaction %s with ready snapshot %s", tx, readySnapshot);
+ allocatedTransaction = null;
- final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(),
- getDebugTransactions(), snapshot, this);
- latestOutstandingTx = ret;
- return ret;
- public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) {
- DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx);
- return new ChainedTransactionCommitImpl(tx, storeCohort, this);
+ protected synchronized DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
+ Preconditions.checkState(tx.equals(allocatedTransaction), "Mis-ordered ready transaction %s last allocated was %s", tx, allocatedTransaction);
+ if (readySnapshot != null) {
+ // The snapshot should have been cleared
+ LOG.warn("Uncleared snapshot {} encountered, overwritten with transaction {} snapshot {}", readySnapshot, tx, tree);
+ }
+
+ final DOMStoreThreePhaseCommitCohort cohort = InMemoryDOMDataStore.this.transactionReady(tx, tree);
+ readySnapshot = tree;
+ return new ChainedTransactionCommitImpl(tx, cohort, this);
}
@Override
public void close() {
}
@Override
public void close() {
// FIXME: this call doesn't look right here - listeningExecutor is shared and owned
// by the outer class.
//listeningExecutor.shutdownNow();
// FIXME: this call doesn't look right here - listeningExecutor is shared and owned
// by the outer class.
//listeningExecutor.shutdownNow();
protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
final Throwable t) {
chainFailed = true;
protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
final Throwable t) {
chainFailed = true;
}
public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
}
public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
- // If committed transaction is latestOutstandingTx we clear
- // latestOutstandingTx
- // field in order to base new transactions on Datastore Data Tree
- // directly.
- if (transaction.equals(latestOutstandingTx)) {
- latestOutstandingTx = null;
+ // If the committed transaction was the one we allocated last,
+ // we clear it and the ready snapshot, so the next transaction
+ // allocated refers to the data tree directly.
+ if (transaction.equals(allocatedTransaction)) {
+ if (readySnapshot == null) {
+ LOG.warn("Transaction {} committed while no ready snapshot present", transaction);
+ }
+
+ allocatedTransaction = null;
+ readySnapshot = null;
}
private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
}
private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
private final SnapshotBackedWriteTransaction transaction;
private final DOMStoreThreePhaseCommitCohort delegate;
private final SnapshotBackedWriteTransaction transaction;
private final DOMStoreThreePhaseCommitCohort delegate;
private final DOMStoreTransactionChainImpl txChain;
protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
private final DOMStoreTransactionChainImpl txChain;
protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
this.transaction = transaction;
this.delegate = delegate;
this.txChain = txChain;
this.transaction = transaction;
this.delegate = delegate;
this.txChain = txChain;
public void onSuccess(final Void result) {
txChain.onTransactionCommited(transaction);
}
public void onSuccess(final Void result) {
txChain.onTransactionCommited(transaction);
}
});
return commitFuture;
}
});
return commitFuture;
}
}
private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
}
private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
private final SnapshotBackedWriteTransaction transaction;
private final DataTreeModification modification;
private ResolveDataChangeEventsTask listenerResolver;
private DataTreeCandidate candidate;
private final SnapshotBackedWriteTransaction transaction;
private final DataTreeModification modification;
private ResolveDataChangeEventsTask listenerResolver;
private DataTreeCandidate candidate;
- public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
+ public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction, final DataTreeModification modification) {
this.transaction = writeTransaction;
this.transaction = writeTransaction;
- this.modification = transaction.getMutatedView();
+ this.modification = modification;
* The commit has to occur atomically with regard to listener
* registrations.
*/
* The commit has to occur atomically with regard to listener
* registrations.
*/
+ synchronized (InMemoryDOMDataStore.this) {
dataTree.commit(candidate);
listenerResolver.resolve(dataChangeListenerNotificationManager);
}
dataTree.commit(candidate);
listenerResolver.resolve(dataChangeListenerNotificationManager);
}
private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, DataTreeModification> TREE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, DataTreeModification.class, "mutableTree");
private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, DataTreeModification> TREE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, DataTreeModification.class, "mutableTree");
- private volatile TransactionReadyPrototype readyImpl; // non-null when not ready
- private volatile DataTreeModification mutableTree; // non-null when not committed/closed
+ // non-null when not ready
+ private volatile TransactionReadyPrototype readyImpl;
+ // non-null when not committed/closed
+ private volatile DataTreeModification mutableTree;
/**
* Creates new write-only transaction.
/**
* Creates new write-only transaction.
checkState(wasReady != null, "Transaction %s is no longer open", getIdentifier());
LOG.debug("Store transaction: {} : Ready", getIdentifier());
checkState(wasReady != null, "Transaction %s is no longer open", getIdentifier());
LOG.debug("Store transaction: {} : Ready", getIdentifier());
- mutableTree.ready();
- return wasReady.ready(this);
+
+ final DataTreeModification tree = mutableTree;
+ TREE_UPDATER.lazySet(this, null);
+ tree.ready();
+ return wasReady.transactionReady(this, tree);
if (wasReady != null) {
LOG.debug("Store transaction: {} : Closed", getIdentifier());
TREE_UPDATER.lazySet(this, null);
if (wasReady != null) {
LOG.debug("Store transaction: {} : Closed", getIdentifier());
TREE_UPDATER.lazySet(this, null);
+ wasReady.transactionAborted(this);
} else {
LOG.debug("Store transaction: {} : Closed after submit", getIdentifier());
}
} else {
LOG.debug("Store transaction: {} : Closed after submit", getIdentifier());
}
return toStringHelper.add("ready", readyImpl == null);
}
return toStringHelper.add("ready", readyImpl == null);
}
- // FIXME: used by chaining on, which really wants an mutated view with a precondition
- final boolean isReady() {
- return readyImpl == null;
- }
-
- protected DataTreeModification getMutatedView() {
- return mutableTree;
- }
-
/**
* Prototype implementation of
* {@link #ready(org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction)}
/**
* Prototype implementation of
* {@link #ready(org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction)}
* providing underlying logic for applying implementation.
*
*/
* providing underlying logic for applying implementation.
*
*/
- // FIXME: needs access to local stuff, so make it an abstract class
- public static interface TransactionReadyPrototype {
+ abstract static class TransactionReadyPrototype {
+ /**
+ * Called when a transaction is closed without being readied. This is not invoked for
+ * transactions which are ready.
+ *
+ * @param tx Transaction which got aborted.
+ */
+ protected abstract void transactionAborted(final SnapshotBackedWriteTransaction tx);
+
/**
* Returns a commit coordinator associated with supplied transactions.
*
/**
* Returns a commit coordinator associated with supplied transactions.
*
*
* @param tx
* Transaction on which ready was invoked.
*
* @param tx
* Transaction on which ready was invoked.
+ * @param tree
+ * Modified data tree which has been constructed.
* @return DOMStoreThreePhaseCommitCohort associated with transaction
*/
* @return DOMStoreThreePhaseCommitCohort associated with transaction
*/
- DOMStoreThreePhaseCommitCohort ready(SnapshotBackedWriteTransaction tx);
+ protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction tx, DataTreeModification tree);
}
}
\ No newline at end of file
}
}
\ No newline at end of file