BUG-650: speed CommitCoordinationTask up 23/11123/2
authorRobert Varga <rovarga@cisco.com>
Fri, 12 Sep 2014 20:47:52 +0000 (22:47 +0200)
committerRobert Varga <rovarga@cisco.com>
Sat, 13 Sep 2014 00:49:59 +0000 (02:49 +0200)
Eliminates synchronized block in favor of a compare-and-swap -- the
logic was doing precisely that anyway.

Change-Id: I9b32ab303eb718e8a0af52526857eead65c2b697
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java

index d796ab35fa88522bd7f89d4802adaffe9c3d0a8e..77cf105ed6a6e676819593dd19ccfcdc8580897d 100644 (file)
@@ -17,7 +17,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -127,20 +127,17 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
     }
 
     /**
-     *
      * Implementation of blocking three-phase commit-coordination tasks without
-     * support of cancelation.
-     *
+     * support of cancellation.
      */
-    private static class CommitCoordinationTask implements Callable<Void> {
-
+    private static final class CommitCoordinationTask implements Callable<Void> {
+        private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
         private final DOMDataWriteTransaction tx;
         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
         private final DurationStatsTracker commitStatTracker;
         private final int cohortSize;
-
-        @GuardedBy("this")
-        private CommitPhase currentPhase;
+        private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
 
         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
@@ -148,26 +145,26 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
                 final DurationStatsTracker commitStatTracker) {
             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
-            this.currentPhase = CommitPhase.SUBMITTED;
             this.commitStatTracker = commitStatTracker;
             this.cohortSize = Iterables.size(cohorts);
         }
 
         @Override
         public Void call() throws TransactionCommitFailedException {
+            final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
 
-            long startTime = System.nanoTime();
             try {
                 canCommitBlocking();
                 preCommitBlocking();
                 commitBlocking();
                 return null;
             } catch (TransactionCommitFailedException e) {
-                LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
-                abortBlocking(e);
+                final CommitPhase phase = currentPhase;
+                LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
+                abortBlocking(e, phase);
                 throw e;
             } finally {
-                if(commitStatTracker != null) {
+                if (commitStatTracker != null) {
                     commitStatTracker.addDuration(System.nanoTime() - startTime);
                 }
             }
@@ -331,18 +328,19 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          *            Exception which should be used to fail transaction for
          *            consumers of transaction
          *            future and listeners of transaction failure.
+         * @param phase phase in which the problem ensued
          * @throws TransactionCommitFailedException
          *             on invocation of this method.
          *             originalCa
          * @throws IllegalStateException
          *             if abort failed.
          */
-        private void abortBlocking(final TransactionCommitFailedException originalCause)
+        private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
                 throws TransactionCommitFailedException {
-            LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
+            LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
             Exception cause = originalCause;
             try {
-                abortAsyncAll().get();
+                abortAsyncAll(phase).get();
             } catch (InterruptedException | ExecutionException e) {
                 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
                 cause = new IllegalStateException("Abort failed.", e);
@@ -352,17 +350,15 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
         }
 
         /**
-         *
          * Invokes abort on underlying cohorts and returns future which
-         * completes
-         * once all abort on cohorts are completed.
+         * completes once all abort on cohorts are completed.
          *
+         * @param phase phase in which the problem ensued
          * @return Future which will complete once all cohorts completed
          *         abort.
-         *
          */
-        private ListenableFuture<Void> abortAsyncAll() {
-            changeStateFrom(currentPhase, CommitPhase.ABORT);
+        private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
+            changeStateFrom(phase, CommitPhase.ABORT);
 
             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
             int i = 0;
@@ -371,7 +367,7 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
             }
 
             /*
-             * We are returing all futures as list, not only succeeded ones in
+             * We are returning all futures as list, not only succeeded ones in
              * order to fail composite future if any of them failed.
              * See Futures.allAsList for this description.
              */
@@ -399,14 +395,13 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          * @throws IllegalStateException
          *             If currentState of task does not match expected state
          */
-        private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
-            Preconditions.checkState(currentPhase.equals(currentExpected),
-                    "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
-                    currentPhase, newState);
-            LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
-            currentPhase = newState;
-        };
+        private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
+            final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
+            Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
+                tx.getIdentifier(), currentExpected, currentPhase, newState);
 
+            LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);
+        };
     }
 
 }
index a7bdd1e801a05e55e51495c61575b5a5a33b897e..662d48afdb2c33681aa4815c64f2a6992ac6329b 100644 (file)
@@ -34,10 +34,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * transactions.
  *
  */
-
-class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMStoreReadWriteTransaction> implements
-        DOMDataReadWriteTransaction {
-
+final class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMStoreReadWriteTransaction> implements DOMDataReadWriteTransaction {
     protected DOMForwardedReadWriteTransaction(final Object identifier,
             final Map<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
             final DOMDataCommitImplementation commitImpl) {
@@ -50,7 +47,8 @@ class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMS
         return getSubtransaction(store).read(path);
     }
 
-    @Override public CheckedFuture<Boolean, ReadFailedException> exists(
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(
         final LogicalDatastoreType store,
         final YangInstanceIdentifier path) {
         return getSubtransaction(store).exists(path);