Bug 2160: Added concurrent 3-phase commit coordinator 52/12052/1
authortpantelis <tpanteli@brocade.com>
Tue, 7 Oct 2014 13:34:01 +0000 (09:34 -0400)
committerMoiz Raja <moraja@cisco.com>
Fri, 17 Oct 2014 14:56:50 +0000 (14:56 +0000)
Added a new class DOMConcurrentDataCommitCoordinator which is an
implementation of DOMDataCommitExecutor that allows multiple concurrent
3-phase transaction commits. The 3 phases are still performed serially and
non-blocking per transaction.

A new flag, "allow-concurrent-commits", was added to the DOM in-memory
data broker's yang config. If true, the new
DOMConcurrentDataCommitCoordinator implementation is used, otherwise the
current implementation is used. The default is false.

The distributed data store will use the concurrent coordinator.

Change-Id: Ie21947d6f9c1086f89686a73442c854c08c07100
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinator.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/jmx/CommitStatsMXBeanImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/yang/opendaylight-dom-broker-impl.yang
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinatorTest.java [new file with mode: 0644]

index fbb666a9caab32c90646796078132d1ed13c8bfa..b57a3f5f0bede2b7d93a880150818251f6247b14 100644 (file)
@@ -29,6 +29,8 @@
                         <type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
                         <name>distributed-operational-store-service</name>
                     </operational-data-store>
+                    
+                    <allow-concurrent-commits>true</allow-concurrent-commits>
                 </module>
 
                 <module>
index ac62974d290e5cb37744e39b085952a57aa28543..8f01a393c6eb28cb494fe89c063d5ab3085237be 100644 (file)
@@ -8,18 +8,24 @@
 package org.opendaylight.controller.config.yang.md.sal.dom.impl;
 
 import java.util.EnumMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMConcurrentDataCommitCoordinator;
 import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataCommitCoordinatorImpl;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataCommitExecutor;
 import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.yangtools.util.DurationStatisticsTracker;
 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import com.google.common.collect.Lists;
 
 /**
 *
@@ -65,17 +71,6 @@ public final class DomInmemoryDataBrokerModule extends
         datastores.put(LogicalDatastoreType.OPERATIONAL, operStore);
         datastores.put(LogicalDatastoreType.CONFIGURATION, configStore);
 
-        /*
-         * We use a single-threaded executor for commits with a bounded queue capacity. If the
-         * queue capacity is reached, subsequent commit tasks will be rejected and the commits will
-         * fail. This is done to relieve back pressure. This should be an extreme scenario - either
-         * there's deadlock(s) somewhere and the controller is unstable or some rogue component is
-         * continuously hammering commits too fast or the controller is just over-capacity for the
-         * system it's running on.
-         */
-        ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
-                getMaxDataBrokerCommitQueueSize(), "WriteTxCommit");
-
         /*
          * We use an executor for commit ListenableFuture callbacks that favors reusing available
          * threads over creating new threads at the expense of execution time. The assumption is
@@ -88,31 +83,65 @@ public final class DomInmemoryDataBrokerModule extends
                 getMaxDataBrokerFutureCallbackPoolSize(), getMaxDataBrokerFutureCallbackQueueSize(),
                 "CommitFutures");
 
-        DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores,
-                new DeadlockDetectingListeningExecutorService(commitExecutor,
-                    TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER,
-                    listenableFutureExecutor));
+        final List<AbstractMXBean> mBeans = Lists.newArrayList();
+
+        DOMDataCommitExecutor commitCoordinator;
+        DurationStatisticsTracker commitStatsTracker = null;
+
+        if(getAllowConcurrentCommits()) {
+            DOMConcurrentDataCommitCoordinator coordinator =
+                    new DOMConcurrentDataCommitCoordinator(listenableFutureExecutor);
+            commitStatsTracker = coordinator.getCommitStatsTracker();
+            commitCoordinator = coordinator;
+        } else {
+            /*
+             * We use a single-threaded executor for commits with a bounded queue capacity. If the
+             * queue capacity is reached, subsequent commit tasks will be rejected and the commits will
+             * fail. This is done to relieve back pressure. This should be an extreme scenario - either
+             * there's deadlock(s) somewhere and the controller is unstable or some rogue component is
+             * continuously hammering commits too fast or the controller is just over-capacity for the
+             * system it's running on.
+             */
+            ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
+                    getMaxDataBrokerCommitQueueSize(), "WriteTxCommit");
+
+            DOMDataCommitCoordinatorImpl coordinator = new DOMDataCommitCoordinatorImpl(
+                    new DeadlockDetectingListeningExecutorService(commitExecutor,
+                            TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER,
+                            listenableFutureExecutor));
+
+            commitStatsTracker = coordinator.getCommitStatsTracker();
+            commitCoordinator = coordinator;
+
+            final AbstractMXBean commitExecutorStatsMXBean =
+                    ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats",
+                            JMX_BEAN_TYPE, null);
+            if(commitExecutorStatsMXBean != null) {
+                mBeans.add(commitExecutorStatsMXBean);
+            }
+        }
 
-        final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl(
-                newDataBroker.getCommitStatsTracker(), JMX_BEAN_TYPE);
-        commitStatsMXBean.registerMBean();
+        DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, commitCoordinator);
+
+        if(commitStatsTracker != null) {
+            final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl(
+                    commitStatsTracker, JMX_BEAN_TYPE);
+            commitStatsMXBean.registerMBean();
+            mBeans.add(commitStatsMXBean);
+        }
 
-        final AbstractMXBean commitExecutorStatsMXBean =
-                ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats",
-                        JMX_BEAN_TYPE, null);
         final AbstractMXBean commitFutureStatsMXBean =
                 ThreadExecutorStatsMXBeanImpl.create(listenableFutureExecutor,
                         "CommitFutureExecutorStats", JMX_BEAN_TYPE, null);
+        if(commitFutureStatsMXBean != null) {
+            mBeans.add(commitFutureStatsMXBean);
+        }
 
         newDataBroker.setCloseable(new AutoCloseable() {
             @Override
             public void close() {
-                commitStatsMXBean.unregisterMBean();
-                if (commitExecutorStatsMXBean != null) {
-                    commitExecutorStatsMXBean.unregisterMBean();
-                }
-                if (commitFutureStatsMXBean != null) {
-                    commitFutureStatsMXBean.unregisterMBean();
+                for(AbstractMXBean mBean: mBeans) {
+                    mBean.unregisterMBean();
                 }
             }
         });
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinator.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinator.java
new file mode 100644 (file)
index 0000000..605d71d
--- /dev/null
@@ -0,0 +1,344 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.DurationStatisticsTracker;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.AbstractListeningExecutorService;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3
+ * commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
+ * (ie async) per transaction but multiple transaction commits can run concurrent.
+ *
+ * @author Thomas Pantelis
+ */
+public class DOMConcurrentDataCommitCoordinator implements DOMDataCommitExecutor {
+
+    private static final String CAN_COMMIT = "CAN_COMMIT";
+    private static final String PRE_COMMIT = "PRE_COMMIT";
+    private static final String COMMIT = "COMMIT";
+
+    private static final Logger LOG = LoggerFactory.getLogger(DOMConcurrentDataCommitCoordinator.class);
+
+    private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
+
+    /**
+     * This executor is used to execute Future listener callback Runnables async.
+     */
+    private final ExecutorService clientFutureCallbackExecutor;
+
+    /**
+     * This executor is re-used internally in calls to Futures#addCallback to avoid the overhead
+     * of Futures#addCallback creating a MoreExecutors#sameThreadExecutor for each call.
+     */
+    private final ExecutorService internalFutureCallbackExecutor = new SimpleSameThreadExecutor();
+
+    public DOMConcurrentDataCommitCoordinator(ExecutorService listenableFutureExecutor) {
+        this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor);
+    }
+
+    public DurationStatisticsTracker getCommitStatsTracker() {
+        return commitStatsTracker;
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
+            Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+
+        Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
+        Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
+        LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
+
+        final int cohortSize = Iterables.size(cohorts);
+        final AsyncNotifyingSettableFuture clientSubmitFuture =
+                new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor);
+
+        doCanCommit(clientSubmitFuture, transaction, cohorts, cohortSize);
+
+        return MappingCheckedFuture.create(clientSubmitFuture,
+                TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+    }
+
+    private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture,
+            final DOMDataWriteTransaction transaction,
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final int cohortSize) {
+
+        final long startTime = System.nanoTime();
+
+        // Not using Futures.allAsList here to avoid its internal overhead.
+        final AtomicInteger remaining = new AtomicInteger(cohortSize);
+        FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(Boolean result) {
+                if (result == null || !result) {
+                    handleException(clientSubmitFuture, transaction, cohorts, cohortSize,
+                            CAN_COMMIT, new TransactionCommitFailedException(
+                                            "Can Commit failed, no detailed cause available."));
+                } else {
+                    if(remaining.decrementAndGet() == 0) {
+                        // All cohorts completed successfully - we can move on to the preCommit phase
+                        doPreCommit(startTime, clientSubmitFuture, transaction, cohorts, cohortSize);
+                    }
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                handleException(clientSubmitFuture, transaction, cohorts, cohortSize, CAN_COMMIT, t);
+            }
+        };
+
+        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
+            ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
+            Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
+        }
+    }
+
+    private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
+            final DOMDataWriteTransaction transaction,
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final int cohortSize) {
+
+        // Not using Futures.allAsList here to avoid its internal overhead.
+        final AtomicInteger remaining = new AtomicInteger(cohortSize);
+        FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(Void notUsed) {
+                if(remaining.decrementAndGet() == 0) {
+                    // All cohorts completed successfully - we can move on to the commit phase
+                    doCommit(startTime, clientSubmitFuture, transaction, cohorts, cohortSize);
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                handleException(clientSubmitFuture, transaction, cohorts, cohortSize, CAN_COMMIT, t);
+            }
+        };
+
+        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
+            ListenableFuture<Void> preCommitFuture = cohort.preCommit();
+            Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
+        }
+    }
+
+    private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
+            final DOMDataWriteTransaction transaction,
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final int cohortSize) {
+
+        // Not using Futures.allAsList here to avoid its internal overhead.
+        final AtomicInteger remaining = new AtomicInteger(cohortSize);
+        FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(Void notUsed) {
+                if(remaining.decrementAndGet() == 0) {
+                    // All cohorts completed successfully - we're done.
+                    commitStatsTracker.addDuration(System.nanoTime() - startTime);
+
+                    clientSubmitFuture.set();
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                handleException(clientSubmitFuture, transaction, cohorts, cohortSize, CAN_COMMIT, t);
+            }
+        };
+
+        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
+            ListenableFuture<Void> commitFuture = cohort.commit();
+            Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
+        }
+    }
+
+    private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
+            final DOMDataWriteTransaction transaction,
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, int cohortSize,
+            final String phase, final Throwable t) {
+
+        if(clientSubmitFuture.isDone()) {
+            // We must have had failures from multiple cohorts.
+            return;
+        }
+
+        LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
+        Exception e;
+        if(t instanceof Exception) {
+            e = (Exception)t;
+        } else {
+            e = new RuntimeException("Unexpected error occurred", t);
+        }
+
+        final TransactionCommitFailedException clientException =
+                TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
+
+        // Transaction failed - tell all cohorts to abort.
+
+        @SuppressWarnings("unchecked")
+        ListenableFuture<Void>[] canCommitFutures = new ListenableFuture[cohortSize];
+        int i = 0;
+        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
+            canCommitFutures[i++] = cohort.abort();
+        }
+
+        ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(canCommitFutures);
+        Futures.addCallback(combinedFuture, new FutureCallback<List<Void>>() {
+            @Override
+            public void onSuccess(List<Void> notUsed) {
+                // Propagate the original exception to the client.
+                clientSubmitFuture.setException(clientException);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.error("Tx: {} Error during Abort.", transaction.getIdentifier(), t);
+
+                // Propagate the original exception as that is what caused the Tx to fail and is
+                // what's interesting to the client.
+                clientSubmitFuture.setException(clientException);
+            }
+        }, internalFutureCallbackExecutor);
+    }
+
+    /**
+     * A settable future that uses an {@link Executor} to execute listener callback Runnables,
+     * registered via {@link #addListener}, asynchronously when this future completes. This is
+     * done to guarantee listener executions are off-loaded onto another thread to avoid blocking
+     * the thread that completed this future, as a common use case is to pass an executor that runs
+     * tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor)
+     * to {@link #addListener}.
+     *
+     * FIXME: This class should probably be moved to yangtools common utils for re-usability and
+     * unified with AsyncNotifyingListenableFutureTask.
+     */
+    private static class AsyncNotifyingSettableFuture extends AbstractFuture<Void> {
+
+        /**
+         * ThreadLocal used to detect if the task completion thread is running the future listener Runnables.
+         */
+        private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<Boolean>();
+
+        private final ExecutorService listenerExecutor;
+
+        AsyncNotifyingSettableFuture(ExecutorService listenerExecutor) {
+            this.listenerExecutor = listenerExecutor;
+        }
+
+        @Override
+        public void addListener(final Runnable listener, final Executor executor) {
+            // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one
+            // that runs tasks in the same thread as the caller submitting the task
+            // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and
+            // the listener is executed from the #set methods, then the DelegatingRunnable will detect
+            // this via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
+            //
+            // On the other hand, if this task is already complete, the call to ExecutionList#add in
+            // superclass will execute the listener Runnable immediately and, since the ThreadLocal
+            // won't be set, the DelegatingRunnable will run the listener Runnable inline.
+            super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
+        }
+
+        boolean set() {
+            ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE);
+            try {
+                return super.set(null);
+            } finally {
+                ON_TASK_COMPLETION_THREAD_TL.set(null);
+            }
+        }
+
+        @Override
+        protected boolean setException(Throwable throwable) {
+            ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE);
+            try {
+                return super.setException(throwable);
+            } finally {
+                ON_TASK_COMPLETION_THREAD_TL.set(null);
+            }
+        }
+
+        private static final class DelegatingRunnable implements Runnable {
+            private final Runnable delegate;
+            private final Executor executor;
+
+            DelegatingRunnable(final Runnable delegate, final Executor executor) {
+                this.delegate = Preconditions.checkNotNull(delegate);
+                this.executor = Preconditions.checkNotNull(executor);
+            }
+
+            @Override
+            public void run() {
+                if (ON_TASK_COMPLETION_THREAD_TL.get() != null) {
+                    // We're running on the task completion thread so off-load to the executor.
+                    LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
+                            Thread.currentThread().getName(), executor);
+                    executor.execute(delegate);
+                } else {
+                    // We're not running on the task completion thread so run the delegate inline.
+                    LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
+                            Thread.currentThread().getName());
+                    delegate.run();
+                }
+            }
+        }
+    }
+
+    /**
+     * A simple same-thread executor without the internal locking overhead that
+     * MoreExecutors#sameThreadExecutor has. The #execute method is the only one of concern - we
+     * don't shutdown the executor so the other methods irrelevant.
+     */
+    private static class SimpleSameThreadExecutor extends AbstractListeningExecutorService {
+
+        @Override
+        public void execute(Runnable command) {
+            command.run();
+        }
+
+        @Override
+        public boolean awaitTermination(long arg0, TimeUnit arg1) throws InterruptedException {
+            return true;
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return false;
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return false;
+        }
+
+        @Override
+        public void shutdown() {
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return null;
+        }
+    }
+}
index 5fbf1270cc7110133637cdb70016f44356a388a3..3f7db01c6b3e0c09065c9c279be46a923a93df5a 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
 import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.EnumMap;
@@ -25,7 +26,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.DurationStatsTracker;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,25 +35,26 @@ public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DO
 
     private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class);
 
-    private final DOMDataCommitCoordinatorImpl coordinator;
+    private final DOMDataCommitExecutor coordinator;
     private final AtomicLong txNum = new AtomicLong();
     private final AtomicLong chainNum = new AtomicLong();
     private volatile AutoCloseable closeable;
 
     public DOMDataBrokerImpl(final Map<LogicalDatastoreType, DOMStore> datastores,
             final ListeningExecutorService executor) {
+        this(datastores, new DOMDataCommitCoordinatorImpl(executor));
+    }
+
+    public DOMDataBrokerImpl(final Map<LogicalDatastoreType, DOMStore> datastores,
+            final DOMDataCommitExecutor coordinator) {
         super(datastores);
-        this.coordinator = new DOMDataCommitCoordinatorImpl(executor);
+        this.coordinator = Preconditions.checkNotNull(coordinator);
     }
 
     public void setCloseable(final AutoCloseable closeable) {
         this.closeable = closeable;
     }
 
-    public DurationStatsTracker getCommitStatsTracker() {
-        return coordinator.getCommitStatsTracker();
-    }
-
     @Override
     public void close() {
         super.close();
index 15d7b1d966e1a59e028616bb85f5670f5805860b..c1ecaa67dfbd9af34f022c110ae35bc5b9002f83 100644 (file)
@@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.util.DurationStatsTracker;
+import org.opendaylight.yangtools.util.DurationStatisticsTracker;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
-    private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
+    private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
     private final ListeningExecutorService executor;
 
     /**
@@ -56,7 +56,7 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
         this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
     }
 
-    public DurationStatsTracker getCommitStatsTracker() {
+    public DurationStatisticsTracker getCommitStatsTracker() {
         return commitStatsTracker;
     }
 
@@ -129,16 +129,16 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
                 AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
         private final DOMDataWriteTransaction tx;
         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
-        private final DurationStatsTracker commitStatTracker;
+        private final DurationStatisticsTracker commitStatTracker;
         private final int cohortSize;
         private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
 
         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
-                final DurationStatsTracker commitStatTracker) {
+                final DurationStatisticsTracker commitStatsTracker) {
             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
-            this.commitStatTracker = commitStatTracker;
+            this.commitStatTracker = commitStatsTracker;
             this.cohortSize = Iterables.size(cohorts);
         }
 
index 8aa97e72d1156fb1a8b19d1e3ca62ff715d11a95..dae14b5128f7794d86c50039818b664b6b186276 100644 (file)
@@ -21,7 +21,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
  *
  *
  */
-interface DOMDataCommitExecutor {
+public interface DOMDataCommitExecutor {
 
     /**
      * Submits supplied transaction to be executed in context of provided
index f67f6b01488c4584dca6ba547ab8230b2fab8f94..0d5306faf7fbb4f479b8c63504ef40d5aca71c48 100644 (file)
@@ -9,9 +9,8 @@
 package org.opendaylight.controller.md.sal.dom.broker.impl.jmx;
 
 import javax.annotation.Nonnull;
-
 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
-import org.opendaylight.yangtools.util.DurationStatsTracker;
+import org.opendaylight.yangtools.util.DurationStatisticsTracker;
 
 /**
  * Implementation of the CommitStatsMXBean interface.
@@ -20,7 +19,7 @@ import org.opendaylight.yangtools.util.DurationStatsTracker;
  */
 public class CommitStatsMXBeanImpl extends AbstractMXBean implements CommitStatsMXBean {
 
-    private final DurationStatsTracker commitStatsTracker;
+    private final DurationStatisticsTracker commitStatsTracker;
 
     /**
      * Constructor.
@@ -28,7 +27,7 @@ public class CommitStatsMXBeanImpl extends AbstractMXBean implements CommitStats
      * @param commitStatsTracker the DurationStatsTracker used to obtain the stats.
      * @param mBeanType mBeanType Used as the <code>type</code> property in the bean's ObjectName.
      */
-    public CommitStatsMXBeanImpl(@Nonnull DurationStatsTracker commitStatsTracker,
+    public CommitStatsMXBeanImpl(@Nonnull DurationStatisticsTracker commitStatsTracker,
             @Nonnull String mBeanType) {
         super("CommitStats", mBeanType, null);
         this.commitStatsTracker = commitStatsTracker;
index e81f71a7d2069014b511b1162168131633d62c1a..fa6d4961939b6f5bcbf4f5eb50d042ea9aa86556 100644 (file)
@@ -120,6 +120,14 @@ module opendaylight-sal-dom-broker-impl {
                 type uint16;
                 description "The maximum queue size for the data broker's commit executor.";
             }
+
+            leaf allow-concurrent-commits {
+                default false;
+                type boolean;
+                description "Specifies whether or not to allow 3-phrase commits to run concurrently.
+                    Use with caution. If set to true, the data store implementations must be prepared
+                    to handle concurrent commits. The default is false";
+            }
         }
     }
     
diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinatorTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinatorTest.java
new file mode 100644 (file)
index 0000000..25d7df1
--- /dev/null
@@ -0,0 +1,260 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.inOrder;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Unit tests for DOMConcurrentDataCommitCoordinator.
+ *
+ * @author Thomas Pantelis
+ */
+public class DOMConcurrentDataCommitCoordinatorTest {
+
+    private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class);
+    private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
+    private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class);
+    private final ThreadPoolExecutor futureExecutor =
+            new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+    private final DOMConcurrentDataCommitCoordinator coordinator =
+            new DOMConcurrentDataCommitCoordinator(futureExecutor);
+
+    @Before
+    public void setup() {
+        doReturn("tx").when(transaction).getIdentifier();
+    }
+
+    @After
+    public void tearDown() {
+        futureExecutor.shutdownNow();
+    }
+
+    @Test
+    public void testSuccessfulSubmitAsync() throws Throwable {
+        testSuccessfulSubmit(true);
+    }
+
+    @Test
+    public void testSuccessfulSubmitSync() throws Throwable {
+        testSuccessfulSubmit(false);
+    }
+
+    private void testSuccessfulSubmit(final boolean doAsync) throws Throwable {
+        final CountDownLatch asyncCanCommitContinue = new CountDownLatch(1);
+        Answer<ListenableFuture<Boolean>> asyncCanCommit = new Answer<ListenableFuture<Boolean>>() {
+            @Override
+            public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
+                final SettableFuture<Boolean> future = SettableFuture.create();
+                if(doAsync) {
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue,
+                                    10, TimeUnit.SECONDS);
+                            future.set(true);
+                        }
+                    }.start();
+                } else {
+                    future.set(true);
+                }
+
+                return future;
+            }
+        };
+
+        doAnswer(asyncCanCommit).when(mockCohort1).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit();
+
+        doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort2).commit();
+
+        CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
+                transaction, Arrays.asList(mockCohort1, mockCohort2));
+
+        final CountDownLatch doneLatch = new CountDownLatch(1);
+        final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(Void result) {
+                doneLatch.countDown();
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                caughtEx.set(t);
+                doneLatch.countDown();
+            }
+        });
+
+        asyncCanCommitContinue.countDown();
+
+        assertEquals("Submit complete", true, doneLatch.await(5, TimeUnit.SECONDS));
+
+        if(caughtEx.get() != null) {
+            throw caughtEx.get();
+        }
+
+        assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount());
+
+        InOrder inOrder = inOrder(mockCohort1, mockCohort2);
+        inOrder.verify(mockCohort1).canCommit();
+        inOrder.verify(mockCohort2).canCommit();
+        inOrder.verify(mockCohort1).preCommit();
+        inOrder.verify(mockCohort2).preCommit();
+        inOrder.verify(mockCohort1).commit();
+        inOrder.verify(mockCohort2).commit();
+    }
+
+    @Test
+    public void testSubmitWithNegativeCanCommitResponse() throws Exception {
+        doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
+
+        doReturn(Futures.immediateFuture(false)).when(mockCohort2).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
+
+        DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
+        doReturn(Futures.immediateFuture(false)).when(mockCohort3).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
+
+        CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
+                transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
+
+        assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3);
+    }
+
+    private void assertFailure(CheckedFuture<Void, TransactionCommitFailedException> future,
+            Exception expCause, DOMStoreThreePhaseCommitCohort... mockCohorts)
+                    throws Exception {
+        try {
+            future.checkedGet(5, TimeUnit.SECONDS);
+            fail("Expected TransactionCommitFailedException");
+        } catch (TransactionCommitFailedException e) {
+            if(expCause != null) {
+                assertSame("Expected cause", expCause, e.getCause());
+            }
+
+            InOrder inOrder = inOrder((Object[])mockCohorts);
+            for(DOMStoreThreePhaseCommitCohort c: mockCohorts) {
+                inOrder.verify(c).abort();
+            }
+        } catch (TimeoutException e) {
+            throw e;
+        }
+    }
+
+    @Test
+    public void testSubmitWithCanCommitException() throws Exception {
+        doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
+
+        IllegalStateException cause = new IllegalStateException("mock");
+        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
+
+        CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
+                transaction, Arrays.asList(mockCohort1, mockCohort2));
+
+        assertFailure(future, cause, mockCohort1, mockCohort2);
+    }
+
+    @Test
+    public void testSubmitWithPreCommitException() throws Exception {
+        doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
+
+        doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
+        IllegalStateException cause = new IllegalStateException("mock");
+        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).preCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
+
+        DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
+        doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit();
+        doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))).
+                when(mockCohort3).preCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
+
+        CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
+                transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
+
+        assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
+    }
+
+    @Test
+    public void testSubmitWithCommitException() throws Exception {
+        doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
+
+        doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit();
+        IllegalStateException cause = new IllegalStateException("mock");
+        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).commit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
+
+        DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
+        doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort3).preCommit();
+        doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))).
+                when(mockCohort3).commit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
+
+        CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
+                transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
+
+        assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
+    }
+
+    @Test
+    public void testSubmitWithAbortException() throws Exception {
+        doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
+        doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock abort error"))).
+                when(mockCohort1).abort();
+
+        IllegalStateException cause = new IllegalStateException("mock canCommit error");
+        doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
+        doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
+
+        CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
+                transaction, Arrays.asList(mockCohort1, mockCohort2));
+
+        assertFailure(future, cause, mockCohort1, mockCohort2);
+    }
+}