From cc58fec4514081d82c515f171bf2411b15c88f7d Mon Sep 17 00:00:00 2001 From: tpantelis Date: Tue, 7 Oct 2014 09:34:01 -0400 Subject: [PATCH] Bug 2160: Added concurrent 3-phase commit coordinator Added a new class DOMConcurrentDataCommitCoordinator which is an implementation of DOMDataCommitExecutor that allows multiple concurrent 3-phase transaction commits. The 3 phases are still performed serially and non-blocking per transaction. A new flag, "allow-concurrent-commits", was added to the DOM in-memory data broker's yang config. If true, the new DOMConcurrentDataCommitCoordinator implementation is used, otherwise the current implementation is used. The default is false. The distributed data store will use the concurrent coordinator. Change-Id: Ie21947d6f9c1086f89686a73442c854c08c07100 Signed-off-by: tpantelis --- .../resources/initial/05-clustering.xml.conf | 2 + .../dom/impl/DomInmemoryDataBrokerModule.java | 83 +++-- .../DOMConcurrentDataCommitCoordinator.java | 344 ++++++++++++++++++ .../dom/broker/impl/DOMDataBrokerImpl.java | 15 +- .../impl/DOMDataCommitCoordinatorImpl.java | 12 +- .../broker/impl/DOMDataCommitExecutor.java | 2 +- .../impl/jmx/CommitStatsMXBeanImpl.java | 7 +- .../yang/opendaylight-dom-broker-impl.yang | 8 + ...OMConcurrentDataCommitCoordinatorTest.java | 260 +++++++++++++ 9 files changed, 688 insertions(+), 45 deletions(-) create mode 100644 opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinator.java create mode 100644 opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinatorTest.java diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf index fbb666a9ca..b57a3f5f0b 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf @@ -29,6 +29,8 @@ operational-dom-store-spi:operational-dom-datastore distributed-operational-store-service + + true diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java index ac62974d29..8f01a393c6 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java @@ -8,18 +8,24 @@ package org.opendaylight.controller.config.yang.md.sal.dom.impl; import java.util.EnumMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMConcurrentDataCommitCoordinator; import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataCommitCoordinatorImpl; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataCommitExecutor; import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.yangtools.util.DurationStatisticsTracker; import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; +import com.google.common.collect.Lists; /** * @@ -65,17 +71,6 @@ public final class DomInmemoryDataBrokerModule extends datastores.put(LogicalDatastoreType.OPERATIONAL, operStore); datastores.put(LogicalDatastoreType.CONFIGURATION, configStore); - /* - * We use a single-threaded executor for commits with a bounded queue capacity. If the - * queue capacity is reached, subsequent commit tasks will be rejected and the commits will - * fail. This is done to relieve back pressure. This should be an extreme scenario - either - * there's deadlock(s) somewhere and the controller is unstable or some rogue component is - * continuously hammering commits too fast or the controller is just over-capacity for the - * system it's running on. - */ - ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor( - getMaxDataBrokerCommitQueueSize(), "WriteTxCommit"); - /* * We use an executor for commit ListenableFuture callbacks that favors reusing available * threads over creating new threads at the expense of execution time. The assumption is @@ -88,31 +83,65 @@ public final class DomInmemoryDataBrokerModule extends getMaxDataBrokerFutureCallbackPoolSize(), getMaxDataBrokerFutureCallbackQueueSize(), "CommitFutures"); - DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, - new DeadlockDetectingListeningExecutorService(commitExecutor, - TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, - listenableFutureExecutor)); + final List mBeans = Lists.newArrayList(); + + DOMDataCommitExecutor commitCoordinator; + DurationStatisticsTracker commitStatsTracker = null; + + if(getAllowConcurrentCommits()) { + DOMConcurrentDataCommitCoordinator coordinator = + new DOMConcurrentDataCommitCoordinator(listenableFutureExecutor); + commitStatsTracker = coordinator.getCommitStatsTracker(); + commitCoordinator = coordinator; + } else { + /* + * We use a single-threaded executor for commits with a bounded queue capacity. If the + * queue capacity is reached, subsequent commit tasks will be rejected and the commits will + * fail. This is done to relieve back pressure. This should be an extreme scenario - either + * there's deadlock(s) somewhere and the controller is unstable or some rogue component is + * continuously hammering commits too fast or the controller is just over-capacity for the + * system it's running on. + */ + ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor( + getMaxDataBrokerCommitQueueSize(), "WriteTxCommit"); + + DOMDataCommitCoordinatorImpl coordinator = new DOMDataCommitCoordinatorImpl( + new DeadlockDetectingListeningExecutorService(commitExecutor, + TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, + listenableFutureExecutor)); + + commitStatsTracker = coordinator.getCommitStatsTracker(); + commitCoordinator = coordinator; + + final AbstractMXBean commitExecutorStatsMXBean = + ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats", + JMX_BEAN_TYPE, null); + if(commitExecutorStatsMXBean != null) { + mBeans.add(commitExecutorStatsMXBean); + } + } - final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl( - newDataBroker.getCommitStatsTracker(), JMX_BEAN_TYPE); - commitStatsMXBean.registerMBean(); + DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, commitCoordinator); + + if(commitStatsTracker != null) { + final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl( + commitStatsTracker, JMX_BEAN_TYPE); + commitStatsMXBean.registerMBean(); + mBeans.add(commitStatsMXBean); + } - final AbstractMXBean commitExecutorStatsMXBean = - ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats", - JMX_BEAN_TYPE, null); final AbstractMXBean commitFutureStatsMXBean = ThreadExecutorStatsMXBeanImpl.create(listenableFutureExecutor, "CommitFutureExecutorStats", JMX_BEAN_TYPE, null); + if(commitFutureStatsMXBean != null) { + mBeans.add(commitFutureStatsMXBean); + } newDataBroker.setCloseable(new AutoCloseable() { @Override public void close() { - commitStatsMXBean.unregisterMBean(); - if (commitExecutorStatsMXBean != null) { - commitExecutorStatsMXBean.unregisterMBean(); - } - if (commitFutureStatsMXBean != null) { - commitFutureStatsMXBean.unregisterMBean(); + for(AbstractMXBean mBean: mBeans) { + mBean.unregisterMBean(); } } }); diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinator.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinator.java new file mode 100644 index 0000000000..605d71d81e --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinator.java @@ -0,0 +1,344 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.util.DurationStatisticsTracker; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.AbstractListeningExecutorService; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3 + * commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking + * (ie async) per transaction but multiple transaction commits can run concurrent. + * + * @author Thomas Pantelis + */ +public class DOMConcurrentDataCommitCoordinator implements DOMDataCommitExecutor { + + private static final String CAN_COMMIT = "CAN_COMMIT"; + private static final String PRE_COMMIT = "PRE_COMMIT"; + private static final String COMMIT = "COMMIT"; + + private static final Logger LOG = LoggerFactory.getLogger(DOMConcurrentDataCommitCoordinator.class); + + private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent(); + + /** + * This executor is used to execute Future listener callback Runnables async. + */ + private final ExecutorService clientFutureCallbackExecutor; + + /** + * This executor is re-used internally in calls to Futures#addCallback to avoid the overhead + * of Futures#addCallback creating a MoreExecutors#sameThreadExecutor for each call. + */ + private final ExecutorService internalFutureCallbackExecutor = new SimpleSameThreadExecutor(); + + public DOMConcurrentDataCommitCoordinator(ExecutorService listenableFutureExecutor) { + this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor); + } + + public DurationStatisticsTracker getCommitStatsTracker() { + return commitStatsTracker; + } + + @Override + public CheckedFuture submit(DOMDataWriteTransaction transaction, + Iterable cohorts) { + + Preconditions.checkArgument(transaction != null, "Transaction must not be null."); + Preconditions.checkArgument(cohorts != null, "Cohorts must not be null."); + LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); + + final int cohortSize = Iterables.size(cohorts); + final AsyncNotifyingSettableFuture clientSubmitFuture = + new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor); + + doCanCommit(clientSubmitFuture, transaction, cohorts, cohortSize); + + return MappingCheckedFuture.create(clientSubmitFuture, + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + } + + private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture, + final DOMDataWriteTransaction transaction, + final Iterable cohorts, final int cohortSize) { + + final long startTime = System.nanoTime(); + + // Not using Futures.allAsList here to avoid its internal overhead. + final AtomicInteger remaining = new AtomicInteger(cohortSize); + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + if (result == null || !result) { + handleException(clientSubmitFuture, transaction, cohorts, cohortSize, + CAN_COMMIT, new TransactionCommitFailedException( + "Can Commit failed, no detailed cause available.")); + } else { + if(remaining.decrementAndGet() == 0) { + // All cohorts completed successfully - we can move on to the preCommit phase + doPreCommit(startTime, clientSubmitFuture, transaction, cohorts, cohortSize); + } + } + } + + @Override + public void onFailure(Throwable t) { + handleException(clientSubmitFuture, transaction, cohorts, cohortSize, CAN_COMMIT, t); + } + }; + + for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { + ListenableFuture canCommitFuture = cohort.canCommit(); + Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor); + } + } + + private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, + final DOMDataWriteTransaction transaction, + final Iterable cohorts, final int cohortSize) { + + // Not using Futures.allAsList here to avoid its internal overhead. + final AtomicInteger remaining = new AtomicInteger(cohortSize); + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(Void notUsed) { + if(remaining.decrementAndGet() == 0) { + // All cohorts completed successfully - we can move on to the commit phase + doCommit(startTime, clientSubmitFuture, transaction, cohorts, cohortSize); + } + } + + @Override + public void onFailure(Throwable t) { + handleException(clientSubmitFuture, transaction, cohorts, cohortSize, CAN_COMMIT, t); + } + }; + + for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { + ListenableFuture preCommitFuture = cohort.preCommit(); + Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor); + } + } + + private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture, + final DOMDataWriteTransaction transaction, + final Iterable cohorts, final int cohortSize) { + + // Not using Futures.allAsList here to avoid its internal overhead. + final AtomicInteger remaining = new AtomicInteger(cohortSize); + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(Void notUsed) { + if(remaining.decrementAndGet() == 0) { + // All cohorts completed successfully - we're done. + commitStatsTracker.addDuration(System.nanoTime() - startTime); + + clientSubmitFuture.set(); + } + } + + @Override + public void onFailure(Throwable t) { + handleException(clientSubmitFuture, transaction, cohorts, cohortSize, CAN_COMMIT, t); + } + }; + + for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { + ListenableFuture commitFuture = cohort.commit(); + Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor); + } + } + + private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture, + final DOMDataWriteTransaction transaction, + final Iterable cohorts, int cohortSize, + final String phase, final Throwable t) { + + if(clientSubmitFuture.isDone()) { + // We must have had failures from multiple cohorts. + return; + } + + LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t); + Exception e; + if(t instanceof Exception) { + e = (Exception)t; + } else { + e = new RuntimeException("Unexpected error occurred", t); + } + + final TransactionCommitFailedException clientException = + TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e); + + // Transaction failed - tell all cohorts to abort. + + @SuppressWarnings("unchecked") + ListenableFuture[] canCommitFutures = new ListenableFuture[cohortSize]; + int i = 0; + for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { + canCommitFutures[i++] = cohort.abort(); + } + + ListenableFuture> combinedFuture = Futures.allAsList(canCommitFutures); + Futures.addCallback(combinedFuture, new FutureCallback>() { + @Override + public void onSuccess(List notUsed) { + // Propagate the original exception to the client. + clientSubmitFuture.setException(clientException); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Tx: {} Error during Abort.", transaction.getIdentifier(), t); + + // Propagate the original exception as that is what caused the Tx to fail and is + // what's interesting to the client. + clientSubmitFuture.setException(clientException); + } + }, internalFutureCallbackExecutor); + } + + /** + * A settable future that uses an {@link Executor} to execute listener callback Runnables, + * registered via {@link #addListener}, asynchronously when this future completes. This is + * done to guarantee listener executions are off-loaded onto another thread to avoid blocking + * the thread that completed this future, as a common use case is to pass an executor that runs + * tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor) + * to {@link #addListener}. + * + * FIXME: This class should probably be moved to yangtools common utils for re-usability and + * unified with AsyncNotifyingListenableFutureTask. + */ + private static class AsyncNotifyingSettableFuture extends AbstractFuture { + + /** + * ThreadLocal used to detect if the task completion thread is running the future listener Runnables. + */ + private static final ThreadLocal ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal(); + + private final ExecutorService listenerExecutor; + + AsyncNotifyingSettableFuture(ExecutorService listenerExecutor) { + this.listenerExecutor = listenerExecutor; + } + + @Override + public void addListener(final Runnable listener, final Executor executor) { + // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one + // that runs tasks in the same thread as the caller submitting the task + // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and + // the listener is executed from the #set methods, then the DelegatingRunnable will detect + // this via the ThreadLocal and submit the listener Runnable to the listenerExecutor. + // + // On the other hand, if this task is already complete, the call to ExecutionList#add in + // superclass will execute the listener Runnable immediately and, since the ThreadLocal + // won't be set, the DelegatingRunnable will run the listener Runnable inline. + super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor); + } + + boolean set() { + ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE); + try { + return super.set(null); + } finally { + ON_TASK_COMPLETION_THREAD_TL.set(null); + } + } + + @Override + protected boolean setException(Throwable throwable) { + ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE); + try { + return super.setException(throwable); + } finally { + ON_TASK_COMPLETION_THREAD_TL.set(null); + } + } + + private static final class DelegatingRunnable implements Runnable { + private final Runnable delegate; + private final Executor executor; + + DelegatingRunnable(final Runnable delegate, final Executor executor) { + this.delegate = Preconditions.checkNotNull(delegate); + this.executor = Preconditions.checkNotNull(executor); + } + + @Override + public void run() { + if (ON_TASK_COMPLETION_THREAD_TL.get() != null) { + // We're running on the task completion thread so off-load to the executor. + LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}", + Thread.currentThread().getName(), executor); + executor.execute(delegate); + } else { + // We're not running on the task completion thread so run the delegate inline. + LOG.trace("Executing ListenenableFuture Runnable on this thread: {}", + Thread.currentThread().getName()); + delegate.run(); + } + } + } + } + + /** + * A simple same-thread executor without the internal locking overhead that + * MoreExecutors#sameThreadExecutor has. The #execute method is the only one of concern - we + * don't shutdown the executor so the other methods irrelevant. + */ + private static class SimpleSameThreadExecutor extends AbstractListeningExecutorService { + + @Override + public void execute(Runnable command) { + command.run(); + } + + @Override + public boolean awaitTermination(long arg0, TimeUnit arg1) throws InterruptedException { + return true; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public void shutdown() { + } + + @Override + public List shutdownNow() { + return null; + } + } +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java index 5fbf1270cc..3f7db01c6b 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListeningExecutorService; import java.util.EnumMap; @@ -25,7 +26,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.util.DurationStatsTracker; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,25 +35,26 @@ public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory datastores, final ListeningExecutorService executor) { + this(datastores, new DOMDataCommitCoordinatorImpl(executor)); + } + + public DOMDataBrokerImpl(final Map datastores, + final DOMDataCommitExecutor coordinator) { super(datastores); - this.coordinator = new DOMDataCommitCoordinatorImpl(executor); + this.coordinator = Preconditions.checkNotNull(coordinator); } public void setCloseable(final AutoCloseable closeable) { this.closeable = closeable; } - public DurationStatsTracker getCommitStatsTracker() { - return coordinator.getCommitStatsTracker(); - } - @Override public void close() { super.close(); diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index 15d7b1d966..c1ecaa67df 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.util.DurationStatsTracker; +import org.opendaylight.yangtools.util.DurationStatisticsTracker; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory; public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class); - private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker(); + private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent(); private final ListeningExecutorService executor; /** @@ -56,7 +56,7 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { this.executor = Preconditions.checkNotNull(executor, "executor must not be null."); } - public DurationStatsTracker getCommitStatsTracker() { + public DurationStatisticsTracker getCommitStatsTracker() { return commitStatsTracker; } @@ -129,16 +129,16 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase"); private final DOMDataWriteTransaction tx; private final Iterable cohorts; - private final DurationStatsTracker commitStatTracker; + private final DurationStatisticsTracker commitStatTracker; private final int cohortSize; private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED; public CommitCoordinationTask(final DOMDataWriteTransaction transaction, final Iterable cohorts, - final DurationStatsTracker commitStatTracker) { + final DurationStatisticsTracker commitStatsTracker) { this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); - this.commitStatTracker = commitStatTracker; + this.commitStatTracker = commitStatsTracker; this.cohortSize = Iterables.size(cohorts); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java index 8aa97e72d1..dae14b5128 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java @@ -21,7 +21,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh * * */ -interface DOMDataCommitExecutor { +public interface DOMDataCommitExecutor { /** * Submits supplied transaction to be executed in context of provided diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/jmx/CommitStatsMXBeanImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/jmx/CommitStatsMXBeanImpl.java index f67f6b0148..0d5306faf7 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/jmx/CommitStatsMXBeanImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/jmx/CommitStatsMXBeanImpl.java @@ -9,9 +9,8 @@ package org.opendaylight.controller.md.sal.dom.broker.impl.jmx; import javax.annotation.Nonnull; - import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; -import org.opendaylight.yangtools.util.DurationStatsTracker; +import org.opendaylight.yangtools.util.DurationStatisticsTracker; /** * Implementation of the CommitStatsMXBean interface. @@ -20,7 +19,7 @@ import org.opendaylight.yangtools.util.DurationStatsTracker; */ public class CommitStatsMXBeanImpl extends AbstractMXBean implements CommitStatsMXBean { - private final DurationStatsTracker commitStatsTracker; + private final DurationStatisticsTracker commitStatsTracker; /** * Constructor. @@ -28,7 +27,7 @@ public class CommitStatsMXBeanImpl extends AbstractMXBean implements CommitStats * @param commitStatsTracker the DurationStatsTracker used to obtain the stats. * @param mBeanType mBeanType Used as the type property in the bean's ObjectName. */ - public CommitStatsMXBeanImpl(@Nonnull DurationStatsTracker commitStatsTracker, + public CommitStatsMXBeanImpl(@Nonnull DurationStatisticsTracker commitStatsTracker, @Nonnull String mBeanType) { super("CommitStats", mBeanType, null); this.commitStatsTracker = commitStatsTracker; diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/yang/opendaylight-dom-broker-impl.yang b/opendaylight/md-sal/sal-dom-broker/src/main/yang/opendaylight-dom-broker-impl.yang index e81f71a7d2..fa6d496193 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/yang/opendaylight-dom-broker-impl.yang +++ b/opendaylight/md-sal/sal-dom-broker/src/main/yang/opendaylight-dom-broker-impl.yang @@ -120,6 +120,14 @@ module opendaylight-sal-dom-broker-impl { type uint16; description "The maximum queue size for the data broker's commit executor."; } + + leaf allow-concurrent-commits { + default false; + type boolean; + description "Specifies whether or not to allow 3-phrase commits to run concurrently. + Use with caution. If set to true, the data store implementations must be prepared + to handle concurrent commits. The default is false"; + } } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinatorTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinatorTest.java new file mode 100644 index 0000000000..25d7df17d1 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMConcurrentDataCommitCoordinatorTest.java @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.inOrder; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.Uninterruptibles; + +/** + * Unit tests for DOMConcurrentDataCommitCoordinator. + * + * @author Thomas Pantelis + */ +public class DOMConcurrentDataCommitCoordinatorTest { + + private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class); + private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class); + private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class); + private final ThreadPoolExecutor futureExecutor = + new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue()); + private final DOMConcurrentDataCommitCoordinator coordinator = + new DOMConcurrentDataCommitCoordinator(futureExecutor); + + @Before + public void setup() { + doReturn("tx").when(transaction).getIdentifier(); + } + + @After + public void tearDown() { + futureExecutor.shutdownNow(); + } + + @Test + public void testSuccessfulSubmitAsync() throws Throwable { + testSuccessfulSubmit(true); + } + + @Test + public void testSuccessfulSubmitSync() throws Throwable { + testSuccessfulSubmit(false); + } + + private void testSuccessfulSubmit(final boolean doAsync) throws Throwable { + final CountDownLatch asyncCanCommitContinue = new CountDownLatch(1); + Answer> asyncCanCommit = new Answer>() { + @Override + public ListenableFuture answer(InvocationOnMock invocation) { + final SettableFuture future = SettableFuture.create(); + if(doAsync) { + new Thread() { + @Override + public void run() { + Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue, + 10, TimeUnit.SECONDS); + future.set(true); + } + }.start(); + } else { + future.set(true); + } + + return future; + } + }; + + doAnswer(asyncCanCommit).when(mockCohort1).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit(); + + doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort2).commit(); + + CheckedFuture future = coordinator.submit( + transaction, Arrays.asList(mockCohort1, mockCohort2)); + + final CountDownLatch doneLatch = new CountDownLatch(1); + final AtomicReference caughtEx = new AtomicReference<>(); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Void result) { + doneLatch.countDown(); + } + + @Override + public void onFailure(Throwable t) { + caughtEx.set(t); + doneLatch.countDown(); + } + }); + + asyncCanCommitContinue.countDown(); + + assertEquals("Submit complete", true, doneLatch.await(5, TimeUnit.SECONDS)); + + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount()); + + InOrder inOrder = inOrder(mockCohort1, mockCohort2); + inOrder.verify(mockCohort1).canCommit(); + inOrder.verify(mockCohort2).canCommit(); + inOrder.verify(mockCohort1).preCommit(); + inOrder.verify(mockCohort2).preCommit(); + inOrder.verify(mockCohort1).commit(); + inOrder.verify(mockCohort2).commit(); + } + + @Test + public void testSubmitWithNegativeCanCommitResponse() throws Exception { + doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort(); + + doReturn(Futures.immediateFuture(false)).when(mockCohort2).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort(); + + DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class); + doReturn(Futures.immediateFuture(false)).when(mockCohort3).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort(); + + CheckedFuture future = coordinator.submit( + transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3)); + + assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3); + } + + private void assertFailure(CheckedFuture future, + Exception expCause, DOMStoreThreePhaseCommitCohort... mockCohorts) + throws Exception { + try { + future.checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + if(expCause != null) { + assertSame("Expected cause", expCause, e.getCause()); + } + + InOrder inOrder = inOrder((Object[])mockCohorts); + for(DOMStoreThreePhaseCommitCohort c: mockCohorts) { + inOrder.verify(c).abort(); + } + } catch (TimeoutException e) { + throw e; + } + } + + @Test + public void testSubmitWithCanCommitException() throws Exception { + doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort(); + + IllegalStateException cause = new IllegalStateException("mock"); + doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort(); + + CheckedFuture future = coordinator.submit( + transaction, Arrays.asList(mockCohort1, mockCohort2)); + + assertFailure(future, cause, mockCohort1, mockCohort2); + } + + @Test + public void testSubmitWithPreCommitException() throws Exception { + doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort(); + + doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit(); + IllegalStateException cause = new IllegalStateException("mock"); + doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).preCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort(); + + DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class); + doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit(); + doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))). + when(mockCohort3).preCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort(); + + CheckedFuture future = coordinator.submit( + transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3)); + + assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3); + } + + @Test + public void testSubmitWithCommitException() throws Exception { + doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort(); + + doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit(); + IllegalStateException cause = new IllegalStateException("mock"); + doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).commit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort(); + + DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class); + doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort3).preCommit(); + doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))). + when(mockCohort3).commit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort(); + + CheckedFuture future = coordinator.submit( + transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3)); + + assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3); + } + + @Test + public void testSubmitWithAbortException() throws Exception { + doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit(); + doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock abort error"))). + when(mockCohort1).abort(); + + IllegalStateException cause = new IllegalStateException("mock canCommit error"); + doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit(); + doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort(); + + CheckedFuture future = coordinator.submit( + transaction, Arrays.asList(mockCohort1, mockCohort2)); + + assertFailure(future, cause, mockCohort1, mockCohort2); + } +} -- 2.36.6