import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
}
/**
- *
* Implementation of blocking three-phase commit-coordination tasks without
- * support of cancelation.
- *
+ * support of cancellation.
*/
- private static class CommitCoordinationTask implements Callable<Void> {
-
+ private static final class CommitCoordinationTask implements Callable<Void> {
+ private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
private final DOMDataWriteTransaction tx;
private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
private final DurationStatsTracker commitStatTracker;
private final int cohortSize;
-
- @GuardedBy("this")
- private CommitPhase currentPhase;
+ private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
final DurationStatsTracker commitStatTracker) {
this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
- this.currentPhase = CommitPhase.SUBMITTED;
this.commitStatTracker = commitStatTracker;
this.cohortSize = Iterables.size(cohorts);
}
@Override
public Void call() throws TransactionCommitFailedException {
+ final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
- long startTime = System.nanoTime();
try {
canCommitBlocking();
preCommitBlocking();
commitBlocking();
return null;
} catch (TransactionCommitFailedException e) {
- LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
- abortBlocking(e);
+ final CommitPhase phase = currentPhase;
+ LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
+ abortBlocking(e, phase);
throw e;
} finally {
- if(commitStatTracker != null) {
+ if (commitStatTracker != null) {
commitStatTracker.addDuration(System.nanoTime() - startTime);
}
}
* Exception which should be used to fail transaction for
* consumers of transaction
* future and listeners of transaction failure.
+ * @param phase phase in which the problem ensued
* @throws TransactionCommitFailedException
* on invocation of this method.
* originalCa
* @throws IllegalStateException
* if abort failed.
*/
- private void abortBlocking(final TransactionCommitFailedException originalCause)
+ private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
throws TransactionCommitFailedException {
- LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
+ LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
Exception cause = originalCause;
try {
- abortAsyncAll().get();
+ abortAsyncAll(phase).get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
cause = new IllegalStateException("Abort failed.", e);
}
/**
- *
* Invokes abort on underlying cohorts and returns future which
- * completes
- * once all abort on cohorts are completed.
+ * completes once all abort on cohorts are completed.
*
+ * @param phase phase in which the problem ensued
* @return Future which will complete once all cohorts completed
* abort.
- *
*/
- private ListenableFuture<Void> abortAsyncAll() {
- changeStateFrom(currentPhase, CommitPhase.ABORT);
+ private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
+ changeStateFrom(phase, CommitPhase.ABORT);
final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
int i = 0;
}
/*
- * We are returing all futures as list, not only succeeded ones in
+ * We are returning all futures as list, not only succeeded ones in
* order to fail composite future if any of them failed.
* See Futures.allAsList for this description.
*/
* @throws IllegalStateException
* If currentState of task does not match expected state
*/
- private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
- Preconditions.checkState(currentPhase.equals(currentExpected),
- "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
- currentPhase, newState);
- LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
- currentPhase = newState;
- };
+ private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
+ final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
+ Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
+ tx.getIdentifier(), currentExpected, currentPhase, newState);
+ LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);
+ };
}
}