From 9357909cc50b5639bd65fb6597a2b40a6b5c06cf Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 12 Sep 2014 22:47:52 +0200 Subject: [PATCH] BUG-650: speed CommitCoordinationTask up Eliminates synchronized block in favor of a compare-and-swap -- the logic was doing precisely that anyway. Change-Id: I9b32ab303eb718e8a0af52526857eead65c2b697 Signed-off-by: Robert Varga --- .../impl/DOMDataCommitCoordinatorImpl.java | 57 +++++++++---------- .../DOMForwardedReadWriteTransaction.java | 8 +-- 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index d796ab35fa..77cf105ed6 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -17,7 +17,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; -import javax.annotation.concurrent.GuardedBy; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -127,20 +127,17 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } /** - * * Implementation of blocking three-phase commit-coordination tasks without - * support of cancelation. - * + * support of cancellation. */ - private static class CommitCoordinationTask implements Callable { - + private static final class CommitCoordinationTask implements Callable { + private static final AtomicReferenceFieldUpdater PHASE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase"); private final DOMDataWriteTransaction tx; private final Iterable cohorts; private final DurationStatsTracker commitStatTracker; private final int cohortSize; - - @GuardedBy("this") - private CommitPhase currentPhase; + private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED; public CommitCoordinationTask(final DOMDataWriteTransaction transaction, final Iterable cohorts, @@ -148,26 +145,26 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { final DurationStatsTracker commitStatTracker) { this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); - this.currentPhase = CommitPhase.SUBMITTED; this.commitStatTracker = commitStatTracker; this.cohortSize = Iterables.size(cohorts); } @Override public Void call() throws TransactionCommitFailedException { + final long startTime = commitStatTracker != null ? System.nanoTime() : 0; - long startTime = System.nanoTime(); try { canCommitBlocking(); preCommitBlocking(); commitBlocking(); return null; } catch (TransactionCommitFailedException e) { - LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e); - abortBlocking(e); + final CommitPhase phase = currentPhase; + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e); + abortBlocking(e, phase); throw e; } finally { - if(commitStatTracker != null) { + if (commitStatTracker != null) { commitStatTracker.addDuration(System.nanoTime() - startTime); } } @@ -331,18 +328,19 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { * Exception which should be used to fail transaction for * consumers of transaction * future and listeners of transaction failure. + * @param phase phase in which the problem ensued * @throws TransactionCommitFailedException * on invocation of this method. * originalCa * @throws IllegalStateException * if abort failed. */ - private void abortBlocking(final TransactionCommitFailedException originalCause) + private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase) throws TransactionCommitFailedException { - LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause); + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause); Exception cause = originalCause; try { - abortAsyncAll().get(); + abortAsyncAll(phase).get(); } catch (InterruptedException | ExecutionException e) { LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e); cause = new IllegalStateException("Abort failed.", e); @@ -352,17 +350,15 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } /** - * * Invokes abort on underlying cohorts and returns future which - * completes - * once all abort on cohorts are completed. + * completes once all abort on cohorts are completed. * + * @param phase phase in which the problem ensued * @return Future which will complete once all cohorts completed * abort. - * */ - private ListenableFuture abortAsyncAll() { - changeStateFrom(currentPhase, CommitPhase.ABORT); + private ListenableFuture abortAsyncAll(final CommitPhase phase) { + changeStateFrom(phase, CommitPhase.ABORT); final ListenableFuture[] ops = new ListenableFuture[cohortSize]; int i = 0; @@ -371,7 +367,7 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } /* - * We are returing all futures as list, not only succeeded ones in + * We are returning all futures as list, not only succeeded ones in * order to fail composite future if any of them failed. * See Futures.allAsList for this description. */ @@ -399,14 +395,13 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { * @throws IllegalStateException * If currentState of task does not match expected state */ - private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) { - Preconditions.checkState(currentPhase.equals(currentExpected), - "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(), - currentPhase, newState); - LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState); - currentPhase = newState; - }; + private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) { + final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState); + Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s", + tx.getIdentifier(), currentExpected, currentPhase, newState); + LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState); + }; } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java index a7bdd1e801..662d48afdb 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java @@ -34,10 +34,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; * transactions. * */ - -class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction implements - DOMDataReadWriteTransaction { - +final class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction implements DOMDataReadWriteTransaction { protected DOMForwardedReadWriteTransaction(final Object identifier, final Map backingTxs, final DOMDataCommitImplementation commitImpl) { @@ -50,7 +47,8 @@ class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction exists( + @Override + public CheckedFuture exists( final LogicalDatastoreType store, final YangInstanceIdentifier path) { return getSubtransaction(store).exists(path); -- 2.36.6