CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ConcurrentDOMDataBroker.java
index 13334c927236cdbeba31bf82ee10fb4f7d0cc952..aa5040fb11215711af2a511621d0831b8cd0c138 100644 (file)
@@ -8,23 +8,21 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.AbstractListeningExecutorService;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.broker.impl.AbstractDOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -34,13 +32,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3
+ * ConcurrentDOMDataBroker commits transactions concurrently. The 3
  * commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
  * (ie async) per transaction but multiple transaction commits can run concurrent.
  *
  * @author Thomas Pantelis
  */
-public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
+public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
     private static final String CAN_COMMIT = "CAN_COMMIT";
     private static final String PRE_COMMIT = "PRE_COMMIT";
@@ -51,15 +49,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
     /**
      * This executor is used to execute Future listener callback Runnables async.
      */
-    private final ExecutorService clientFutureCallbackExecutor;
+    private final Executor clientFutureCallbackExecutor;
 
-    /**
-     * This executor is re-used internally in calls to Futures#addCallback to avoid the overhead
-     * of Futures#addCallback creating a MoreExecutors#sameThreadExecutor for each call.
-     */
-    private final ExecutorService internalFutureCallbackExecutor = new SimpleSameThreadExecutor();
-
-    public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, ExecutorService listenableFutureExecutor) {
+    public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, Executor listenableFutureExecutor) {
         super(datastores);
         this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor);
     }
@@ -69,18 +61,21 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
     }
 
     @Override
-    public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
-            Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+    protected CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
+            Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
 
         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
 
-        final int cohortSize = Iterables.size(cohorts);
+        if(cohorts.isEmpty()){
+            return Futures.immediateCheckedFuture(null);
+        }
+
         final AsyncNotifyingSettableFuture clientSubmitFuture =
                 new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor);
 
-        doCanCommit(clientSubmitFuture, transaction, cohorts, cohortSize);
+        doCanCommit(clientSubmitFuture, transaction, cohorts);
 
         return MappingCheckedFuture.create(clientSubmitFuture,
                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
@@ -88,113 +83,119 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
 
     private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataWriteTransaction transaction,
-            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final int cohortSize) {
+            final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
 
         final long startTime = System.nanoTime();
 
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohortSize);
         FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
                 if (result == null || !result) {
-                    handleException(clientSubmitFuture, transaction, cohorts, cohortSize,
+                    handleException(clientSubmitFuture, transaction, cohorts,
                             CAN_COMMIT, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER,
                             new TransactionCommitFailedException(
                                             "Can Commit failed, no detailed cause available."));
                 } else {
-                    if(remaining.decrementAndGet() == 0) {
+                    if(!cohortIterator.hasNext()) {
                         // All cohorts completed successfully - we can move on to the preCommit phase
-                        doPreCommit(startTime, clientSubmitFuture, transaction, cohorts, cohortSize);
+                        doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
+                    } else {
+                        ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+                        Futures.addCallback(canCommitFuture, this, MoreExecutors.directExecutor());
                     }
                 }
             }
 
             @Override
             public void onFailure(Throwable t) {
-                handleException(clientSubmitFuture, transaction, cohorts, cohortSize, CAN_COMMIT,
+                handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT,
                         TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER, t);
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
-            Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+        Futures.addCallback(canCommitFuture, futureCallback, MoreExecutors.directExecutor());
     }
 
     private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataWriteTransaction transaction,
-            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final int cohortSize) {
+            final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
 
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohortSize);
         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void notUsed) {
-                if(remaining.decrementAndGet() == 0) {
+                if(!cohortIterator.hasNext()) {
                     // All cohorts completed successfully - we can move on to the commit phase
-                    doCommit(startTime, clientSubmitFuture, transaction, cohorts, cohortSize);
+                    doCommit(startTime, clientSubmitFuture, transaction, cohorts);
+                } else {
+                    ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+                    Futures.addCallback(preCommitFuture, this, MoreExecutors.directExecutor());
                 }
             }
 
             @Override
             public void onFailure(Throwable t) {
-                handleException(clientSubmitFuture, transaction, cohorts, cohortSize, PRE_COMMIT,
+                handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT,
                         TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER, t);
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Void> preCommitFuture = cohort.preCommit();
-            Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+        Futures.addCallback(preCommitFuture, futureCallback, MoreExecutors.directExecutor());
     }
 
     private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataWriteTransaction transaction,
-            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final int cohortSize) {
+            final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
 
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohortSize);
         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void notUsed) {
-                if(remaining.decrementAndGet() == 0) {
+                if(!cohortIterator.hasNext()) {
                     // All cohorts completed successfully - we're done.
                     commitStatsTracker.addDuration(System.nanoTime() - startTime);
 
                     clientSubmitFuture.set();
+                } else {
+                    ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+                    Futures.addCallback(commitFuture, this, MoreExecutors.directExecutor());
                 }
             }
 
             @Override
             public void onFailure(Throwable t) {
-                handleException(clientSubmitFuture, transaction, cohorts, cohortSize, COMMIT,
+                handleException(clientSubmitFuture, transaction, cohorts, COMMIT,
                         TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER, t);
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Void> commitFuture = cohort.commit();
-            Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+        Futures.addCallback(commitFuture, futureCallback, MoreExecutors.directExecutor());
     }
 
-    private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
+    private static void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataWriteTransaction transaction,
-            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, int cohortSize,
+            final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
             final String phase, final TransactionCommitFailedExceptionMapper exMapper,
             final Throwable t) {
 
-        if(clientSubmitFuture.isDone()) {
+        if (clientSubmitFuture.isDone()) {
             // We must have had failures from multiple cohorts.
             return;
         }
 
         LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
-        Exception e;
-        if(t instanceof Exception) {
+        final Exception e;
+        if (t instanceof Exception) {
             e = (Exception)t;
         } else {
             e = new RuntimeException("Unexpected error occurred", t);
@@ -205,9 +206,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
         // Transaction failed - tell all cohorts to abort.
 
         @SuppressWarnings("unchecked")
-        ListenableFuture<Void>[] canCommitFutures = new ListenableFuture[cohortSize];
+        ListenableFuture<Void>[] canCommitFutures = new ListenableFuture[cohorts.size()];
         int i = 0;
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
+        for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
             canCommitFutures[i++] = cohort.abort();
         }
 
@@ -227,7 +228,7 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
                 // what's interesting to the client.
                 clientSubmitFuture.setException(clientException);
             }
-        }, internalFutureCallbackExecutor);
+        }, MoreExecutors.directExecutor());
     }
 
     /**
@@ -248,10 +249,10 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
          */
         private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<Boolean>();
 
-        private final ExecutorService listenerExecutor;
+        private final Executor listenerExecutor;
 
-        AsyncNotifyingSettableFuture(ExecutorService listenerExecutor) {
-            this.listenerExecutor = listenerExecutor;
+        AsyncNotifyingSettableFuture(Executor listenerExecutor) {
+            this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
         }
 
         @Override
@@ -312,41 +313,4 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
             }
         }
     }
-
-    /**
-     * A simple same-thread executor without the internal locking overhead that
-     * MoreExecutors#sameThreadExecutor has. The #execute method is the only one of concern - we
-     * don't shutdown the executor so the other methods irrelevant.
-     */
-    private static class SimpleSameThreadExecutor extends AbstractListeningExecutorService {
-
-        @Override
-        public void execute(Runnable command) {
-            command.run();
-        }
-
-        @Override
-        public boolean awaitTermination(long arg0, TimeUnit arg1) throws InterruptedException {
-            return true;
-        }
-
-        @Override
-        public boolean isShutdown() {
-            return false;
-        }
-
-        @Override
-        public boolean isTerminated() {
-            return false;
-        }
-
-        @Override
-        public void shutdown() {
-        }
-
-        @Override
-        public List<Runnable> shutdownNow() {
-            return null;
-        }
-    }
 }