X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconfig%2Fyang%2Fmd%2Fsal%2Fdom%2Fimpl%2FDomInmemoryDataBrokerModule.java;h=8f01a393c6eb28cb494fe89c063d5ab3085237be;hb=73e969cf365dd78772596c71e940ae44fe2f22d3;hp=8664e8910b46bda523951f22961d59c13cbaec42;hpb=63b36aa3537d77bd9be323e1113716ef2cd54098;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java index 8664e8910b..8f01a393c6 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java @@ -7,16 +7,25 @@ */ package org.opendaylight.controller.config.yang.md.sal.dom.impl; -import java.util.concurrent.Executor; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; +import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; +import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMConcurrentDataCommitCoordinator; import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataCommitCoordinatorImpl; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataCommitExecutor; +import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.yangtools.util.DurationStatisticsTracker; import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; /** * @@ -24,6 +33,8 @@ import com.google.common.collect.ImmutableMap; public final class DomInmemoryDataBrokerModule extends org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomInmemoryDataBrokerModule { + private static final String JMX_BEAN_TYPE = "DOMDataBroker"; + public DomInmemoryDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); @@ -55,20 +66,10 @@ public final class DomInmemoryDataBrokerModule extends //we will default to InMemoryDOMDataStore creation configStore = InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency()); } - ImmutableMap datastores = ImmutableMap - . builder().put(LogicalDatastoreType.OPERATIONAL, operStore) - .put(LogicalDatastoreType.CONFIGURATION, configStore).build(); - /* - * We use a single-threaded executor for commits with a bounded queue capacity. If the - * queue capacity is reached, subsequent commit tasks will be rejected and the commits will - * fail. This is done to relieve back pressure. This should be an extreme scenario - either - * there's deadlock(s) somewhere and the controller is unstable or some rogue component is - * continuously hammering commits too fast or the controller is just over-capacity for the - * system it's running on. - */ - ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor( - getMaxDataBrokerCommitQueueSize(), "WriteTxCommit"); + final Map datastores = new EnumMap<>(LogicalDatastoreType.class); + datastores.put(LogicalDatastoreType.OPERATIONAL, operStore); + datastores.put(LogicalDatastoreType.CONFIGURATION, configStore); /* * We use an executor for commit ListenableFuture callbacks that favors reusing available @@ -78,14 +79,72 @@ public final class DomInmemoryDataBrokerModule extends * nothing on success. The executor queue capacity is bounded and, if the capacity is * reached, subsequent submitted tasks will block the caller. */ - Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool( + ExecutorService listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool( getMaxDataBrokerFutureCallbackPoolSize(), getMaxDataBrokerFutureCallbackQueueSize(), "CommitFutures"); - DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, - new DeadlockDetectingListeningExecutorService(commitExecutor, - TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, - listenableFutureExecutor)); + final List mBeans = Lists.newArrayList(); + + DOMDataCommitExecutor commitCoordinator; + DurationStatisticsTracker commitStatsTracker = null; + + if(getAllowConcurrentCommits()) { + DOMConcurrentDataCommitCoordinator coordinator = + new DOMConcurrentDataCommitCoordinator(listenableFutureExecutor); + commitStatsTracker = coordinator.getCommitStatsTracker(); + commitCoordinator = coordinator; + } else { + /* + * We use a single-threaded executor for commits with a bounded queue capacity. If the + * queue capacity is reached, subsequent commit tasks will be rejected and the commits will + * fail. This is done to relieve back pressure. This should be an extreme scenario - either + * there's deadlock(s) somewhere and the controller is unstable or some rogue component is + * continuously hammering commits too fast or the controller is just over-capacity for the + * system it's running on. + */ + ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor( + getMaxDataBrokerCommitQueueSize(), "WriteTxCommit"); + + DOMDataCommitCoordinatorImpl coordinator = new DOMDataCommitCoordinatorImpl( + new DeadlockDetectingListeningExecutorService(commitExecutor, + TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, + listenableFutureExecutor)); + + commitStatsTracker = coordinator.getCommitStatsTracker(); + commitCoordinator = coordinator; + + final AbstractMXBean commitExecutorStatsMXBean = + ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats", + JMX_BEAN_TYPE, null); + if(commitExecutorStatsMXBean != null) { + mBeans.add(commitExecutorStatsMXBean); + } + } + + DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, commitCoordinator); + + if(commitStatsTracker != null) { + final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl( + commitStatsTracker, JMX_BEAN_TYPE); + commitStatsMXBean.registerMBean(); + mBeans.add(commitStatsMXBean); + } + + final AbstractMXBean commitFutureStatsMXBean = + ThreadExecutorStatsMXBeanImpl.create(listenableFutureExecutor, + "CommitFutureExecutorStats", JMX_BEAN_TYPE, null); + if(commitFutureStatsMXBean != null) { + mBeans.add(commitFutureStatsMXBean); + } + + newDataBroker.setCloseable(new AutoCloseable() { + @Override + public void close() { + for(AbstractMXBean mBean: mBeans) { + mBean.unregisterMBean(); + } + } + }); return newDataBroker; }