import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
}
/**
- *
* Implementation of blocking three-phase commit-coordination tasks without
- * support of cancelation.
- *
+ * support of cancellation.
*/
- private static class CommitCoordinationTask implements Callable<Void> {
-
+ private static final class CommitCoordinationTask implements Callable<Void> {
+ private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
private final DOMDataWriteTransaction tx;
private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
private final DurationStatsTracker commitStatTracker;
private final int cohortSize;
-
- @GuardedBy("this")
- private CommitPhase currentPhase;
+ private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
final DurationStatsTracker commitStatTracker) {
this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
- this.currentPhase = CommitPhase.SUBMITTED;
this.commitStatTracker = commitStatTracker;
this.cohortSize = Iterables.size(cohorts);
}
@Override
public Void call() throws TransactionCommitFailedException {
+ final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
- long startTime = System.nanoTime();
try {
canCommitBlocking();
preCommitBlocking();
commitBlocking();
return null;
} catch (TransactionCommitFailedException e) {
- LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
- abortBlocking(e);
+ final CommitPhase phase = currentPhase;
+ LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
+ abortBlocking(e, phase);
throw e;
} finally {
- if(commitStatTracker != null) {
+ if (commitStatTracker != null) {
commitStatTracker.addDuration(System.nanoTime() - startTime);
}
}
* Exception which should be used to fail transaction for
* consumers of transaction
* future and listeners of transaction failure.
+ * @param phase phase in which the problem ensued
* @throws TransactionCommitFailedException
* on invocation of this method.
* originalCa
* @throws IllegalStateException
* if abort failed.
*/
- private void abortBlocking(final TransactionCommitFailedException originalCause)
+ private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
throws TransactionCommitFailedException {
- LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
+ LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
Exception cause = originalCause;
try {
- abortAsyncAll().get();
+ abortAsyncAll(phase).get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
cause = new IllegalStateException("Abort failed.", e);
}
/**
- *
* Invokes abort on underlying cohorts and returns future which
- * completes
- * once all abort on cohorts are completed.
+ * completes once all abort on cohorts are completed.
*
+ * @param phase phase in which the problem ensued
* @return Future which will complete once all cohorts completed
* abort.
- *
*/
- private ListenableFuture<Void> abortAsyncAll() {
- changeStateFrom(currentPhase, CommitPhase.ABORT);
+ private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
+ changeStateFrom(phase, CommitPhase.ABORT);
final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
int i = 0;
}
/*
- * We are returing all futures as list, not only succeeded ones in
+ * We are returning all futures as list, not only succeeded ones in
* order to fail composite future if any of them failed.
* See Futures.allAsList for this description.
*/
* @throws IllegalStateException
* If currentState of task does not match expected state
*/
- private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
- Preconditions.checkState(currentPhase.equals(currentExpected),
- "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
- currentPhase, newState);
- LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
- currentPhase = newState;
- };
+ private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
+ final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
+ Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
+ tx.getIdentifier(), currentExpected, currentPhase, newState);
+ LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);
+ };
}
}
* transactions.
*
*/
-
-class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMStoreReadWriteTransaction> implements
- DOMDataReadWriteTransaction {
-
+final class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMStoreReadWriteTransaction> implements DOMDataReadWriteTransaction {
protected DOMForwardedReadWriteTransaction(final Object identifier,
final Map<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
final DOMDataCommitImplementation commitImpl) {
return getSubtransaction(store).read(path);
}
- @Override public CheckedFuture<Boolean, ReadFailedException> exists(
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(
final LogicalDatastoreType store,
final YangInstanceIdentifier path) {
return getSubtransaction(store).exists(path);
package org.opendaylight.controller.md.sal.dom.store.impl;
import static com.google.common.base.Preconditions.checkState;
-
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
* to implement {@link DOMStore} contract.
*
*/
-public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
- TransactionReadyPrototype,AutoCloseable {
+public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
"DataChangeListenerQueueMgr");
}
- public void setCloseable(AutoCloseable closeable) {
+ public void setCloseable(final AutoCloseable closeable) {
this.closeable = closeable;
}
}
@Override
- public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction writeTx) {
- LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
- return new ThreePhaseCommitImpl(writeTx);
+ protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+ LOG.debug("Tx: {} is closed.", tx.getIdentifier());
+ }
+
+ @Override
+ protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
+ LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), tree);
+ return new ThreePhaseCommitImpl(tx, tree);
}
private Object nextIdentifier() {
return name + "-" + txCounter.getAndIncrement();
}
- private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype {
-
+ private class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
+ @GuardedBy("this")
+ private SnapshotBackedWriteTransaction allocatedTransaction;
+ @GuardedBy("this")
+ private DataTreeSnapshot readySnapshot;
@GuardedBy("this")
- private SnapshotBackedWriteTransaction latestOutstandingTx;
-
private boolean chainFailed = false;
+ @GuardedBy("this")
private void checkFailed() {
Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
}
- @Override
- public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
- final DataTreeSnapshot snapshot;
+ @GuardedBy("this")
+ private DataTreeSnapshot getSnapshot() {
checkFailed();
- if (latestOutstandingTx != null) {
- checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = latestOutstandingTx.getMutatedView();
+
+ if (allocatedTransaction != null) {
+ Preconditions.checkState(readySnapshot != null, "Previous transaction %s is not ready yet", allocatedTransaction.getIdentifier());
+ return readySnapshot;
} else {
- snapshot = dataTree.takeSnapshot();
+ return dataTree.takeSnapshot();
}
+ }
+
+ @GuardedBy("this")
+ private <T extends SnapshotBackedWriteTransaction> T recordTransaction(final T transaction) {
+ allocatedTransaction = transaction;
+ readySnapshot = null;
+ return transaction;
+ }
+
+ @Override
+ public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
+ final DataTreeSnapshot snapshot = getSnapshot();
return new SnapshotBackedReadTransaction(nextIdentifier(), getDebugTransactions(), snapshot);
}
@Override
public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
- final DataTreeSnapshot snapshot;
- checkFailed();
- if (latestOutstandingTx != null) {
- checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = latestOutstandingTx.getMutatedView();
- } else {
- snapshot = dataTree.takeSnapshot();
- }
- final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
- getDebugTransactions(), snapshot, this);
- latestOutstandingTx = ret;
- return ret;
+ final DataTreeSnapshot snapshot = getSnapshot();
+ return recordTransaction(new SnapshotBackedReadWriteTransaction(nextIdentifier(),
+ getDebugTransactions(), snapshot, this));
}
@Override
public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
- final DataTreeSnapshot snapshot;
- checkFailed();
- if (latestOutstandingTx != null) {
- checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = latestOutstandingTx.getMutatedView();
- } else {
- snapshot = dataTree.takeSnapshot();
+ final DataTreeSnapshot snapshot = getSnapshot();
+ return recordTransaction(new SnapshotBackedWriteTransaction(nextIdentifier(),
+ getDebugTransactions(), snapshot, this));
+ }
+
+ @Override
+ protected synchronized void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+ if (tx.equals(allocatedTransaction)) {
+ Preconditions.checkState(readySnapshot == null, "Unexpected abort of transaction %s with ready snapshot %s", tx, readySnapshot);
+ allocatedTransaction = null;
}
- final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(),
- getDebugTransactions(), snapshot, this);
- latestOutstandingTx = ret;
- return ret;
}
@Override
- public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) {
- DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx);
- return new ChainedTransactionCommitImpl(tx, storeCohort, this);
+ protected synchronized DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
+ Preconditions.checkState(tx.equals(allocatedTransaction), "Mis-ordered ready transaction %s last allocated was %s", tx, allocatedTransaction);
+ if (readySnapshot != null) {
+ // The snapshot should have been cleared
+ LOG.warn("Uncleared snapshot {} encountered, overwritten with transaction {} snapshot {}", readySnapshot, tx, tree);
+ }
+
+ final DOMStoreThreePhaseCommitCohort cohort = InMemoryDOMDataStore.this.transactionReady(tx, tree);
+ readySnapshot = tree;
+ return new ChainedTransactionCommitImpl(tx, cohort, this);
}
@Override
public void close() {
-
// FIXME: this call doesn't look right here - listeningExecutor is shared and owned
// by the outer class.
//listeningExecutor.shutdownNow();
protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
final Throwable t) {
chainFailed = true;
-
}
public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
- // If committed transaction is latestOutstandingTx we clear
- // latestOutstandingTx
- // field in order to base new transactions on Datastore Data Tree
- // directly.
- if (transaction.equals(latestOutstandingTx)) {
- latestOutstandingTx = null;
+ // If the committed transaction was the one we allocated last,
+ // we clear it and the ready snapshot, so the next transaction
+ // allocated refers to the data tree directly.
+ if (transaction.equals(allocatedTransaction)) {
+ if (readySnapshot == null) {
+ LOG.warn("Transaction {} committed while no ready snapshot present", transaction);
+ }
+
+ allocatedTransaction = null;
+ readySnapshot = null;
}
}
-
}
private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
-
private final SnapshotBackedWriteTransaction transaction;
private final DOMStoreThreePhaseCommitCohort delegate;
-
private final DOMStoreTransactionChainImpl txChain;
protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
- super();
this.transaction = transaction;
this.delegate = delegate;
this.txChain = txChain;
public void onSuccess(final Void result) {
txChain.onTransactionCommited(transaction);
}
-
});
return commitFuture;
}
-
}
private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
-
private final SnapshotBackedWriteTransaction transaction;
private final DataTreeModification modification;
private ResolveDataChangeEventsTask listenerResolver;
private DataTreeCandidate candidate;
- public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
+ public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction, final DataTreeModification modification) {
this.transaction = writeTransaction;
- this.modification = transaction.getMutatedView();
+ this.modification = modification;
}
@Override
* The commit has to occur atomically with regard to listener
* registrations.
*/
- synchronized (this) {
+ synchronized (InMemoryDOMDataStore.this) {
dataTree.commit(candidate);
listenerResolver.resolve(dataChangeListenerNotificationManager);
}
private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, DataTreeModification> TREE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, DataTreeModification.class, "mutableTree");
- private volatile TransactionReadyPrototype readyImpl; // non-null when not ready
- private volatile DataTreeModification mutableTree; // non-null when not committed/closed
+ // non-null when not ready
+ private volatile TransactionReadyPrototype readyImpl;
+ // non-null when not committed/closed
+ private volatile DataTreeModification mutableTree;
/**
* Creates new write-only transaction.
checkState(wasReady != null, "Transaction %s is no longer open", getIdentifier());
LOG.debug("Store transaction: {} : Ready", getIdentifier());
- mutableTree.ready();
- return wasReady.ready(this);
+
+ final DataTreeModification tree = mutableTree;
+ TREE_UPDATER.lazySet(this, null);
+ tree.ready();
+ return wasReady.transactionReady(this, tree);
}
@Override
if (wasReady != null) {
LOG.debug("Store transaction: {} : Closed", getIdentifier());
TREE_UPDATER.lazySet(this, null);
+ wasReady.transactionAborted(this);
} else {
LOG.debug("Store transaction: {} : Closed after submit", getIdentifier());
}
return toStringHelper.add("ready", readyImpl == null);
}
- // FIXME: used by chaining on, which really wants an mutated view with a precondition
- final boolean isReady() {
- return readyImpl == null;
- }
-
- protected DataTreeModification getMutatedView() {
- return mutableTree;
- }
-
/**
* Prototype implementation of
* {@link #ready(org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction)}
* providing underlying logic for applying implementation.
*
*/
- // FIXME: needs access to local stuff, so make it an abstract class
- public static interface TransactionReadyPrototype {
+ abstract static class TransactionReadyPrototype {
+ /**
+ * Called when a transaction is closed without being readied. This is not invoked for
+ * transactions which are ready.
+ *
+ * @param tx Transaction which got aborted.
+ */
+ protected abstract void transactionAborted(final SnapshotBackedWriteTransaction tx);
+
/**
* Returns a commit coordinator associated with supplied transactions.
*
*
* @param tx
* Transaction on which ready was invoked.
+ * @param tree
+ * Modified data tree which has been constructed.
* @return DOMStoreThreePhaseCommitCohort associated with transaction
*/
- DOMStoreThreePhaseCommitCohort ready(SnapshotBackedWriteTransaction tx);
+ protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction tx, DataTreeModification tree);
}
}
\ No newline at end of file