X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconfig%2Fyang%2Fmd%2Fsal%2Fdom%2Fimpl%2FDomInmemoryDataBrokerModule.java;h=8f01a393c6eb28cb494fe89c063d5ab3085237be;hb=73e969cf365dd78772596c71e940ae44fe2f22d3;hp=0841435785ebb70a30dc10acc24fa13c1d6ad3bf;hpb=daf965d90a98f590751a1da6e61558497fff4e0a;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java index 0841435785..8f01a393c6 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java @@ -7,17 +7,25 @@ */ package org.opendaylight.controller.config.yang.md.sal.dom.impl; -import com.google.common.collect.ImmutableMap; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; +import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMConcurrentDataCommitCoordinator; import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataCommitCoordinatorImpl; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataCommitExecutor; import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.yangtools.util.DurationStatisticsTracker; import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; +import com.google.common.collect.Lists; /** * @@ -58,20 +66,10 @@ public final class DomInmemoryDataBrokerModule extends //we will default to InMemoryDOMDataStore creation configStore = InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency()); } - ImmutableMap datastores = ImmutableMap - . builder().put(LogicalDatastoreType.OPERATIONAL, operStore) - .put(LogicalDatastoreType.CONFIGURATION, configStore).build(); - /* - * We use a single-threaded executor for commits with a bounded queue capacity. If the - * queue capacity is reached, subsequent commit tasks will be rejected and the commits will - * fail. This is done to relieve back pressure. This should be an extreme scenario - either - * there's deadlock(s) somewhere and the controller is unstable or some rogue component is - * continuously hammering commits too fast or the controller is just over-capacity for the - * system it's running on. - */ - ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor( - getMaxDataBrokerCommitQueueSize(), "WriteTxCommit"); + final Map datastores = new EnumMap<>(LogicalDatastoreType.class); + datastores.put(LogicalDatastoreType.OPERATIONAL, operStore); + datastores.put(LogicalDatastoreType.CONFIGURATION, configStore); /* * We use an executor for commit ListenableFuture callbacks that favors reusing available @@ -85,31 +83,66 @@ public final class DomInmemoryDataBrokerModule extends getMaxDataBrokerFutureCallbackPoolSize(), getMaxDataBrokerFutureCallbackQueueSize(), "CommitFutures"); - DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, - new DeadlockDetectingListeningExecutorService(commitExecutor, - TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, - listenableFutureExecutor)); + final List mBeans = Lists.newArrayList(); + + DOMDataCommitExecutor commitCoordinator; + DurationStatisticsTracker commitStatsTracker = null; + + if(getAllowConcurrentCommits()) { + DOMConcurrentDataCommitCoordinator coordinator = + new DOMConcurrentDataCommitCoordinator(listenableFutureExecutor); + commitStatsTracker = coordinator.getCommitStatsTracker(); + commitCoordinator = coordinator; + } else { + /* + * We use a single-threaded executor for commits with a bounded queue capacity. If the + * queue capacity is reached, subsequent commit tasks will be rejected and the commits will + * fail. This is done to relieve back pressure. This should be an extreme scenario - either + * there's deadlock(s) somewhere and the controller is unstable or some rogue component is + * continuously hammering commits too fast or the controller is just over-capacity for the + * system it's running on. + */ + ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor( + getMaxDataBrokerCommitQueueSize(), "WriteTxCommit"); + + DOMDataCommitCoordinatorImpl coordinator = new DOMDataCommitCoordinatorImpl( + new DeadlockDetectingListeningExecutorService(commitExecutor, + TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, + listenableFutureExecutor)); + + commitStatsTracker = coordinator.getCommitStatsTracker(); + commitCoordinator = coordinator; + + final AbstractMXBean commitExecutorStatsMXBean = + ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats", + JMX_BEAN_TYPE, null); + if(commitExecutorStatsMXBean != null) { + mBeans.add(commitExecutorStatsMXBean); + } + } - final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl( - newDataBroker.getCommitStatsTracker(), JMX_BEAN_TYPE); - commitStatsMXBean.registerMBean(); + DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, commitCoordinator); - final ThreadExecutorStatsMXBeanImpl commitExecutorStatsMXBean = - new ThreadExecutorStatsMXBeanImpl(commitExecutor, "CommitExecutorStats", - JMX_BEAN_TYPE, null); - commitExecutorStatsMXBean.registerMBean(); + if(commitStatsTracker != null) { + final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl( + commitStatsTracker, JMX_BEAN_TYPE); + commitStatsMXBean.registerMBean(); + mBeans.add(commitStatsMXBean); + } - final ThreadExecutorStatsMXBeanImpl commitFutureStatsMXBean = - new ThreadExecutorStatsMXBeanImpl(listenableFutureExecutor, + final AbstractMXBean commitFutureStatsMXBean = + ThreadExecutorStatsMXBeanImpl.create(listenableFutureExecutor, "CommitFutureExecutorStats", JMX_BEAN_TYPE, null); - commitFutureStatsMXBean.registerMBean(); + if(commitFutureStatsMXBean != null) { + mBeans.add(commitFutureStatsMXBean); + } newDataBroker.setCloseable(new AutoCloseable() { @Override public void close() { - commitStatsMXBean.unregisterMBean(); - commitExecutorStatsMXBean.unregisterMBean(); - commitFutureStatsMXBean.unregisterMBean(); + for(AbstractMXBean mBean: mBeans) { + mBean.unregisterMBean(); + } } });