Merge changes I9b32ab30,Iebe4701f
authorTony Tkacik <ttkacik@cisco.com>
Sat, 13 Sep 2014 12:46:44 +0000 (12:46 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 13 Sep 2014 12:46:44 +0000 (12:46 +0000)
* changes:
  BUG-650: speed CommitCoordinationTask up
  BUG-650: improve transaction chaining performance

opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedWriteTransaction.java

index d796ab35fa88522bd7f89d4802adaffe9c3d0a8e..77cf105ed6a6e676819593dd19ccfcdc8580897d 100644 (file)
@@ -17,7 +17,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -127,20 +127,17 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
     }
 
     /**
-     *
      * Implementation of blocking three-phase commit-coordination tasks without
-     * support of cancelation.
-     *
+     * support of cancellation.
      */
-    private static class CommitCoordinationTask implements Callable<Void> {
-
+    private static final class CommitCoordinationTask implements Callable<Void> {
+        private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
         private final DOMDataWriteTransaction tx;
         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
         private final DurationStatsTracker commitStatTracker;
         private final int cohortSize;
-
-        @GuardedBy("this")
-        private CommitPhase currentPhase;
+        private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
 
         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
@@ -148,26 +145,26 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
                 final DurationStatsTracker commitStatTracker) {
             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
-            this.currentPhase = CommitPhase.SUBMITTED;
             this.commitStatTracker = commitStatTracker;
             this.cohortSize = Iterables.size(cohorts);
         }
 
         @Override
         public Void call() throws TransactionCommitFailedException {
+            final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
 
-            long startTime = System.nanoTime();
             try {
                 canCommitBlocking();
                 preCommitBlocking();
                 commitBlocking();
                 return null;
             } catch (TransactionCommitFailedException e) {
-                LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
-                abortBlocking(e);
+                final CommitPhase phase = currentPhase;
+                LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
+                abortBlocking(e, phase);
                 throw e;
             } finally {
-                if(commitStatTracker != null) {
+                if (commitStatTracker != null) {
                     commitStatTracker.addDuration(System.nanoTime() - startTime);
                 }
             }
@@ -331,18 +328,19 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          *            Exception which should be used to fail transaction for
          *            consumers of transaction
          *            future and listeners of transaction failure.
+         * @param phase phase in which the problem ensued
          * @throws TransactionCommitFailedException
          *             on invocation of this method.
          *             originalCa
          * @throws IllegalStateException
          *             if abort failed.
          */
-        private void abortBlocking(final TransactionCommitFailedException originalCause)
+        private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
                 throws TransactionCommitFailedException {
-            LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
+            LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
             Exception cause = originalCause;
             try {
-                abortAsyncAll().get();
+                abortAsyncAll(phase).get();
             } catch (InterruptedException | ExecutionException e) {
                 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
                 cause = new IllegalStateException("Abort failed.", e);
@@ -352,17 +350,15 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
         }
 
         /**
-         *
          * Invokes abort on underlying cohorts and returns future which
-         * completes
-         * once all abort on cohorts are completed.
+         * completes once all abort on cohorts are completed.
          *
+         * @param phase phase in which the problem ensued
          * @return Future which will complete once all cohorts completed
          *         abort.
-         *
          */
-        private ListenableFuture<Void> abortAsyncAll() {
-            changeStateFrom(currentPhase, CommitPhase.ABORT);
+        private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
+            changeStateFrom(phase, CommitPhase.ABORT);
 
             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
             int i = 0;
@@ -371,7 +367,7 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
             }
 
             /*
-             * We are returing all futures as list, not only succeeded ones in
+             * We are returning all futures as list, not only succeeded ones in
              * order to fail composite future if any of them failed.
              * See Futures.allAsList for this description.
              */
@@ -399,14 +395,13 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          * @throws IllegalStateException
          *             If currentState of task does not match expected state
          */
-        private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
-            Preconditions.checkState(currentPhase.equals(currentExpected),
-                    "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
-                    currentPhase, newState);
-            LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
-            currentPhase = newState;
-        };
+        private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
+            final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
+            Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
+                tx.getIdentifier(), currentExpected, currentPhase, newState);
 
+            LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);
+        };
     }
 
 }
index a7bdd1e801a05e55e51495c61575b5a5a33b897e..662d48afdb2c33681aa4815c64f2a6992ac6329b 100644 (file)
@@ -34,10 +34,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * transactions.
  *
  */
-
-class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMStoreReadWriteTransaction> implements
-        DOMDataReadWriteTransaction {
-
+final class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMStoreReadWriteTransaction> implements DOMDataReadWriteTransaction {
     protected DOMForwardedReadWriteTransaction(final Object identifier,
             final Map<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
             final DOMDataCommitImplementation commitImpl) {
@@ -50,7 +47,8 @@ class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMS
         return getSubtransaction(store).read(path);
     }
 
-    @Override public CheckedFuture<Boolean, ReadFailedException> exists(
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(
         final LogicalDatastoreType store,
         final YangInstanceIdentifier path) {
         return getSubtransaction(store).exists(path);
index 3e748618169889cdfcf0b9f8eb66c73e65cb4338..3d61c7b6b65b3816bcc12247ace3f3b57656177e 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import static com.google.common.base.Preconditions.checkState;
-
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
@@ -62,8 +61,7 @@ import org.slf4j.LoggerFactory;
  * to implement {@link DOMStore} contract.
  *
  */
-public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
-        TransactionReadyPrototype,AutoCloseable {
+public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
     private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
 
@@ -114,7 +112,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
                         "DataChangeListenerQueueMgr");
     }
 
-    public void setCloseable(AutoCloseable closeable) {
+    public void setCloseable(final AutoCloseable closeable) {
         this.closeable = closeable;
     }
 
@@ -215,80 +213,95 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     }
 
     @Override
-    public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction writeTx) {
-        LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
-        return new ThreePhaseCommitImpl(writeTx);
+    protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+        LOG.debug("Tx: {} is closed.", tx.getIdentifier());
+    }
+
+    @Override
+    protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
+        LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), tree);
+        return new ThreePhaseCommitImpl(tx, tree);
     }
 
     private Object nextIdentifier() {
         return name + "-" + txCounter.getAndIncrement();
     }
 
-    private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype {
-
+    private class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
+        @GuardedBy("this")
+        private SnapshotBackedWriteTransaction allocatedTransaction;
+        @GuardedBy("this")
+        private DataTreeSnapshot readySnapshot;
         @GuardedBy("this")
-        private SnapshotBackedWriteTransaction latestOutstandingTx;
-
         private boolean chainFailed = false;
 
+        @GuardedBy("this")
         private void checkFailed() {
             Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
         }
 
-        @Override
-        public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
-            final DataTreeSnapshot snapshot;
+        @GuardedBy("this")
+        private DataTreeSnapshot getSnapshot() {
             checkFailed();
-            if (latestOutstandingTx != null) {
-                checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
-                snapshot = latestOutstandingTx.getMutatedView();
+
+            if (allocatedTransaction != null) {
+                Preconditions.checkState(readySnapshot != null, "Previous transaction %s is not ready yet", allocatedTransaction.getIdentifier());
+                return readySnapshot;
             } else {
-                snapshot = dataTree.takeSnapshot();
+                return dataTree.takeSnapshot();
             }
+        }
+
+        @GuardedBy("this")
+        private <T extends SnapshotBackedWriteTransaction> T recordTransaction(final T transaction) {
+            allocatedTransaction = transaction;
+            readySnapshot = null;
+            return transaction;
+        }
+
+        @Override
+        public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
+            final DataTreeSnapshot snapshot = getSnapshot();
             return new SnapshotBackedReadTransaction(nextIdentifier(), getDebugTransactions(), snapshot);
         }
 
         @Override
         public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
-            final DataTreeSnapshot snapshot;
-            checkFailed();
-            if (latestOutstandingTx != null) {
-                checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
-                snapshot = latestOutstandingTx.getMutatedView();
-            } else {
-                snapshot = dataTree.takeSnapshot();
-            }
-            final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
-                    getDebugTransactions(), snapshot, this);
-            latestOutstandingTx = ret;
-            return ret;
+            final DataTreeSnapshot snapshot = getSnapshot();
+            return recordTransaction(new SnapshotBackedReadWriteTransaction(nextIdentifier(),
+                    getDebugTransactions(), snapshot, this));
         }
 
         @Override
         public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
-            final DataTreeSnapshot snapshot;
-            checkFailed();
-            if (latestOutstandingTx != null) {
-                checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
-                snapshot = latestOutstandingTx.getMutatedView();
-            } else {
-                snapshot = dataTree.takeSnapshot();
+            final DataTreeSnapshot snapshot = getSnapshot();
+            return recordTransaction(new SnapshotBackedWriteTransaction(nextIdentifier(),
+                    getDebugTransactions(), snapshot, this));
+        }
+
+        @Override
+        protected synchronized void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+            if (tx.equals(allocatedTransaction)) {
+                Preconditions.checkState(readySnapshot == null, "Unexpected abort of transaction %s with ready snapshot %s", tx, readySnapshot);
+                allocatedTransaction = null;
             }
-            final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(),
-                    getDebugTransactions(), snapshot, this);
-            latestOutstandingTx = ret;
-            return ret;
         }
 
         @Override
-        public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) {
-            DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx);
-            return new ChainedTransactionCommitImpl(tx, storeCohort, this);
+        protected synchronized DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
+            Preconditions.checkState(tx.equals(allocatedTransaction), "Mis-ordered ready transaction %s last allocated was %s", tx, allocatedTransaction);
+            if (readySnapshot != null) {
+                // The snapshot should have been cleared
+                LOG.warn("Uncleared snapshot {} encountered, overwritten with transaction {} snapshot {}", readySnapshot, tx, tree);
+            }
+
+            final DOMStoreThreePhaseCommitCohort cohort = InMemoryDOMDataStore.this.transactionReady(tx, tree);
+            readySnapshot = tree;
+            return new ChainedTransactionCommitImpl(tx, cohort, this);
         }
 
         @Override
         public void close() {
-
             // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
             // by the outer class.
             //listeningExecutor.shutdownNow();
@@ -297,31 +310,30 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
                 final Throwable t) {
             chainFailed = true;
-
         }
 
         public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
-            // If committed transaction is latestOutstandingTx we clear
-            // latestOutstandingTx
-            // field in order to base new transactions on Datastore Data Tree
-            // directly.
-            if (transaction.equals(latestOutstandingTx)) {
-                latestOutstandingTx = null;
+            // If the committed transaction was the one we allocated last,
+            // we clear it and the ready snapshot, so the next transaction
+            // allocated refers to the data tree directly.
+            if (transaction.equals(allocatedTransaction)) {
+                if (readySnapshot == null) {
+                    LOG.warn("Transaction {} committed while no ready snapshot present", transaction);
+                }
+
+                allocatedTransaction = null;
+                readySnapshot = null;
             }
         }
-
     }
 
     private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
-
         private final SnapshotBackedWriteTransaction transaction;
         private final DOMStoreThreePhaseCommitCohort delegate;
-
         private final DOMStoreTransactionChainImpl txChain;
 
         protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
                 final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
-            super();
             this.transaction = transaction;
             this.delegate = delegate;
             this.txChain = txChain;
@@ -355,24 +367,21 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
                 public void onSuccess(final Void result) {
                     txChain.onTransactionCommited(transaction);
                 }
-
             });
             return commitFuture;
         }
-
     }
 
     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
-
         private final SnapshotBackedWriteTransaction transaction;
         private final DataTreeModification modification;
 
         private ResolveDataChangeEventsTask listenerResolver;
         private DataTreeCandidate candidate;
 
-        public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
+        public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction, final DataTreeModification modification) {
             this.transaction = writeTransaction;
-            this.modification = transaction.getMutatedView();
+            this.modification = modification;
         }
 
         @Override
@@ -425,7 +434,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
              * The commit has to occur atomically with regard to listener
              * registrations.
              */
-            synchronized (this) {
+            synchronized (InMemoryDOMDataStore.this) {
                 dataTree.commit(candidate);
                 listenerResolver.resolve(dataChangeListenerNotificationManager);
             }
index 5f102047684f595a81adc12ddd18866f4fbcd9d7..60a23403b3ac7fce67b01b85295752cdbb2a2e6e 100644 (file)
@@ -35,8 +35,10 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction impleme
     private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, DataTreeModification> TREE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, DataTreeModification.class, "mutableTree");
 
-    private volatile TransactionReadyPrototype readyImpl;        // non-null when not ready
-    private volatile DataTreeModification mutableTree;           // non-null when not committed/closed
+    // non-null when not ready
+    private volatile TransactionReadyPrototype readyImpl;
+    // non-null when not committed/closed
+    private volatile DataTreeModification mutableTree;
 
     /**
      * Creates new write-only transaction.
@@ -137,8 +139,11 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction impleme
         checkState(wasReady != null, "Transaction %s is no longer open", getIdentifier());
 
         LOG.debug("Store transaction: {} : Ready", getIdentifier());
-        mutableTree.ready();
-        return wasReady.ready(this);
+
+        final DataTreeModification tree = mutableTree;
+        TREE_UPDATER.lazySet(this, null);
+        tree.ready();
+        return wasReady.transactionReady(this, tree);
     }
 
     @Override
@@ -147,6 +152,7 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction impleme
         if (wasReady != null) {
             LOG.debug("Store transaction: {} : Closed", getIdentifier());
             TREE_UPDATER.lazySet(this, null);
+            wasReady.transactionAborted(this);
         } else {
             LOG.debug("Store transaction: {} : Closed after submit", getIdentifier());
         }
@@ -157,15 +163,6 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction impleme
         return toStringHelper.add("ready", readyImpl == null);
     }
 
-    // FIXME: used by chaining on, which really wants an mutated view with a precondition
-    final boolean isReady() {
-        return readyImpl == null;
-    }
-
-    protected DataTreeModification getMutatedView() {
-        return mutableTree;
-    }
-
     /**
      * Prototype implementation of
      * {@link #ready(org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction)}
@@ -175,8 +172,15 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction impleme
      * providing underlying logic for applying implementation.
      *
      */
-    // FIXME: needs access to local stuff, so make it an abstract class
-    public static interface TransactionReadyPrototype {
+    abstract static class TransactionReadyPrototype {
+        /**
+         * Called when a transaction is closed without being readied. This is not invoked for
+         * transactions which are ready.
+         *
+         * @param tx Transaction which got aborted.
+         */
+        protected abstract void transactionAborted(final SnapshotBackedWriteTransaction tx);
+
         /**
          * Returns a commit coordinator associated with supplied transactions.
          *
@@ -184,8 +188,10 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction impleme
          *
          * @param tx
          *            Transaction on which ready was invoked.
+         * @param tree
+         *            Modified data tree which has been constructed.
          * @return DOMStoreThreePhaseCommitCohort associated with transaction
          */
-        DOMStoreThreePhaseCommitCohort ready(SnapshotBackedWriteTransaction tx);
+        protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction tx, DataTreeModification tree);
     }
 }
\ No newline at end of file