public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
bindingDataChangeListener.onDataChanged(new TranslatedDataChangeEvent(change, path));
}
+
+ @Override
+ public String toString() {
+ return bindingDataChangeListener.getClass().getName();
+ }
}
private class TranslatedDataChangeEvent implements AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> {
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import java.lang.management.ManagementFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Abstract base for an MXBean implementation class.
+ * <p>
+ * This class is not intended for use outside of MD-SAL and its part of private
+ * implementation (still exported as public to be reused across MD-SAL implementation
+ * components) and may be removed in subsequent
+ * releases.
+ *
+ * @author Thomas Pantelis
+ */
+@Beta
+public abstract class AbstractMXBean {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractMXBean.class);
+
+ public static String BASE_JMX_PREFIX = "org.opendaylight.controller:";
+
+ private final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+ private final String mBeanName;
+ private final String mBeanType;
+ private final String mBeanCategory;
+
+ /**
+ * Constructor.
+ *
+ * @param mBeanName Used as the <code>name</code> property in the bean's ObjectName.
+ * @param mBeanType Used as the <code>type</code> property in the bean's ObjectName.
+ * @param mBeanCategory Used as the <code>Category</code> property in the bean's ObjectName.
+ */
+ protected AbstractMXBean(@Nonnull String mBeanName, @Nonnull String mBeanType,
+ @Nullable String mBeanCategory) {
+ this.mBeanName = mBeanName;
+ this.mBeanType = mBeanType;
+ this.mBeanCategory = mBeanCategory;
+ }
+
+ private ObjectName getMBeanObjectName() throws MalformedObjectNameException {
+ StringBuilder builder = new StringBuilder(BASE_JMX_PREFIX)
+ .append("type=").append(getMBeanType());
+
+ if(getMBeanCategory() != null) {
+ builder.append(",Category=").append(getMBeanCategory());
+ }
+
+ builder.append(",name=").append(getMBeanName());
+ return new ObjectName(builder.toString());
+ }
+
+ /**
+ * Registers this bean with the platform MBean server with the domain defined by
+ * {@link #BASE_JMX_PREFIX}.
+ *
+ * @return true is successfully registered, false otherwise.
+ */
+ public boolean registerMBean() {
+ boolean registered = false;
+ try {
+ // Object to identify MBean
+ final ObjectName mbeanName = this.getMBeanObjectName();
+
+ LOG.debug("Register MBean {}", mbeanName);
+
+ // unregistered if already registered
+ if(server.isRegistered(mbeanName)) {
+
+ LOG.debug("MBean {} found to be already registered", mbeanName);
+
+ try {
+ unregisterMBean(mbeanName);
+ } catch(Exception e) {
+
+ LOG.warn("unregister mbean {} resulted in exception {} ", mbeanName, e);
+ }
+ }
+ server.registerMBean(this, mbeanName);
+ registered = true;
+
+ LOG.debug("MBean {} registered successfully", mbeanName.getCanonicalName());
+ } catch(Exception e) {
+
+ LOG.error("registration failed {}", e);
+
+ }
+ return registered;
+ }
+
+ /**
+ * Unregisters this bean with the platform MBean server.
+ *
+ * @return true is successfully unregistered, false otherwise.
+ */
+ public boolean unregisterMBean() {
+ boolean unregister = false;
+ try {
+ ObjectName mbeanName = this.getMBeanObjectName();
+ unregisterMBean(mbeanName);
+ unregister = true;
+ } catch(Exception e) {
+
+ LOG.error("Failed when unregistering MBean {}", e);
+ }
+
+ return unregister;
+ }
+
+ private void unregisterMBean(ObjectName mbeanName) throws MBeanRegistrationException,
+ InstanceNotFoundException {
+ server.unregisterMBean(mbeanName);
+ }
+
+ /**
+ * Returns the <code>name</code> property of the bean's ObjectName.
+ */
+ public String getMBeanName() {
+ return mBeanName;
+ }
+
+ /**
+ * Returns the <code>type</code> property of the bean's ObjectName.
+ */
+ public String getMBeanType() {
+ return mBeanType;
+ }
+
+ /**
+ * Returns the <code>Category</code> property of the bean's ObjectName.
+ */
+ public String getMBeanCategory() {
+ return mBeanCategory;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import java.util.List;
+
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+
+/**
+ * MXBean interface for {@link QueuedNotificationManager} statistic metrics.
+ *
+ * @author Thomas Pantelis
+ */
+public interface QueuedNotificationManagerMXBean {
+
+ /**
+ * Returns a list of stat instances for each current listener notification task in progress.
+ */
+ List<ListenerNotificationQueueStats> getCurrentListenerQueueStats();
+
+ /**
+ * Returns the configured maximum listener queue size.
+ */
+ int getMaxListenerQueueSize();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import java.util.List;
+
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of the QueuedNotificationManagerMXBean interface.
+ *
+ * <p>
+ * This class is not intended for use outside of MD-SAL and its part of private
+ * implementation (still exported as public to be reused across MD-SAL implementation
+ * components) and may be removed in subsequent
+ * releases.
+ *
+ * @author Thomas Pantelis
+ */
+public class QueuedNotificationManagerMXBeanImpl extends AbstractMXBean
+ implements QueuedNotificationManagerMXBean {
+
+ private final QueuedNotificationManager<?,?> manager;
+
+ public QueuedNotificationManagerMXBeanImpl( QueuedNotificationManager<?,?> manager,
+ String mBeanName, String mBeanType, String mBeanCategory ) {
+ super(mBeanName, mBeanType, mBeanCategory);
+ this.manager = Preconditions.checkNotNull( manager );
+ }
+
+ @Override
+ public List<ListenerNotificationQueueStats> getCurrentListenerQueueStats() {
+ return manager.getListenerNotificationQueueStats();
+ }
+
+ @Override
+ public int getMaxListenerQueueSize() {
+ return manager.getMaxQueueCapacity();
+ }
+
+ public QueuedNotificationManagerStats toQueuedNotificationManagerStats() {
+ return new QueuedNotificationManagerStats( getMaxListenerQueueSize(),
+ getCurrentListenerQueueStats() );
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import java.beans.ConstructorProperties;
+import java.util.List;
+
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+
+/**
+ * A bean class that holds various QueuedNotificationManager statistic metrics. This class is
+ * suitable for mapping to the MXBean CompositeDataSupport type.
+ *
+ * <p>
+ * This class is not intended for use outside of MD-SAL and its part of private
+ * implementation (still exported as public to be reused across MD-SAL implementation
+ * components) and may be removed in subsequent
+ * releases.
+ * @author Thomas Pantelis
+ * @see QueuedNotificationManagerMXBeanImpl
+ */
+public class QueuedNotificationManagerStats {
+
+ private final int maxListenerQueueSize;
+ private final List<ListenerNotificationQueueStats> currentListenerQueueStats;
+
+ @ConstructorProperties({"maxListenerQueueSize","currentListenerQueueStats"})
+ public QueuedNotificationManagerStats( int maxListenerQueueSize,
+ List<ListenerNotificationQueueStats> currentListenerQueueStats ) {
+ super();
+ this.maxListenerQueueSize = maxListenerQueueSize;
+ this.currentListenerQueueStats = currentListenerQueueStats;
+ }
+
+ public List<ListenerNotificationQueueStats> getCurrentListenerQueueStats() {
+ return currentListenerQueueStats;
+ }
+
+ public int getMaxListenerQueueSize() {
+ return maxListenerQueueSize;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import java.beans.ConstructorProperties;
+
+/**
+ * A bean class that holds various thread executor statistic metrics. This class is suitable for
+ * mapping to the MXBean CompositeDataSupport type;
+ *
+ * @author Thomas Pantelis
+ * @see ThreadExecutorStatsMXBeanImpl
+ */
+public class ThreadExecutorStats {
+
+ private final long activeThreadCount;
+ private final long completedTaskCount;
+ private final long currentQueueSize;
+ private final long maxThreadPoolSize;
+ private final long totalTaskCount;
+ private final long largestThreadPoolSize;
+ private final long maxQueueSize;
+ private final long currentThreadPoolSize;
+
+ // The following fields are defined as Long because they may be null if we can't a value
+ // from the underlying executor.
+ private final Long largestQueueSize;
+ private final Long rejectedTaskCount;
+
+ @ConstructorProperties({"activeThreadCount","currentThreadPoolSize","largestThreadPoolSize",
+ "maxThreadPoolSize","currentQueueSize","largestQueueSize","maxQueueSize",
+ "completedTaskCount","totalTaskCount","rejectedTaskCount"})
+ public ThreadExecutorStats(long activeThreadCount, long currentThreadPoolSize,
+ long largestThreadPoolSize, long maxThreadPoolSize, long currentQueueSize,
+ Long largestQueueSize, long maxQueueSize, long completedTaskCount,
+ long totalTaskCount, Long rejectedTaskCount) {
+ this.activeThreadCount = activeThreadCount;
+ this.currentThreadPoolSize = currentThreadPoolSize;
+ this.largestQueueSize = largestQueueSize;
+ this.largestThreadPoolSize = largestThreadPoolSize;
+ this.maxThreadPoolSize = maxThreadPoolSize;
+ this.currentQueueSize = currentQueueSize;
+ this.maxQueueSize = maxQueueSize;
+ this.completedTaskCount = completedTaskCount;
+ this.totalTaskCount = totalTaskCount;
+ this.rejectedTaskCount = rejectedTaskCount;
+ }
+
+ public long getActiveThreadCount() {
+ return activeThreadCount;
+ }
+
+ public long getCompletedTaskCount() {
+ return completedTaskCount;
+ }
+
+ public Long getRejectedTaskCount() {
+ return rejectedTaskCount;
+ }
+
+ public long getCurrentQueueSize() {
+ return currentQueueSize;
+ }
+
+ public Long getLargestQueueSize() {
+ return largestQueueSize;
+ }
+
+ public long getMaxThreadPoolSize() {
+ return maxThreadPoolSize;
+ }
+
+ public long getTotalTaskCount() {
+ return totalTaskCount;
+ }
+
+ public long getLargestThreadPoolSize() {
+ return largestThreadPoolSize;
+ }
+
+ public long getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public long getCurrentThreadPoolSize() {
+ return currentThreadPoolSize;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+/**
+ * MXBean interface for thread executor statistic metrics.
+ *
+ * @author Thomas Pantelis
+ */
+public interface ThreadExecutorStatsMXBean {
+
+ /**
+ * Returns the current thread pool size.
+ */
+ long getCurrentThreadPoolSize();
+
+ /**
+ * Returns the largest thread pool size.
+ */
+ long getLargestThreadPoolSize();
+
+ /**
+ * Returns the maximum thread pool size.
+ */
+ long getMaxThreadPoolSize();
+
+ /**
+ * Returns the current queue size.
+ */
+ long getCurrentQueueSize();
+
+ /**
+ * Returns the largest queue size, if available.
+ */
+ Long getLargestQueueSize();
+
+ /**
+ * Returns the maximum queue size.
+ */
+ long getMaxQueueSize();
+
+ /**
+ * Returns the active thread count.
+ */
+ long getActiveThreadCount();
+
+ /**
+ * Returns the completed task count.
+ */
+ long getCompletedTaskCount();
+
+ /**
+ * Returns the total task count.
+ */
+ long getTotalTaskCount();
+
+ /**
+ * Returns the rejected task count, if available.
+ */
+ Long getRejectedTaskCount();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.util.jmx;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import javax.annotation.Nullable;
+import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
+import org.opendaylight.yangtools.util.concurrent.TrackingLinkedBlockingQueue;
+
+/**
+ * MXBean implementation of the ThreadExecutorStatsMXBean interface that retrieves statistics
+ * from a backing {@link java.util.concurrent.ExecutorService}.
+ *
+ * @author Thomas Pantelis
+ */
+public class ThreadExecutorStatsMXBeanImpl extends AbstractMXBean
+ implements ThreadExecutorStatsMXBean {
+
+ private final ThreadPoolExecutor executor;
+
+ /**
+ * Constructs an instance for the given {@link Executor}.
+ *
+ * @param executor the backing {@link Executor}
+ * @param mBeanName Used as the <code>name</code> property in the bean's ObjectName.
+ * @param mBeanType Used as the <code>type</code> property in the bean's ObjectName.
+ * @param mBeanCategory Used as the <code>Category</code> property in the bean's ObjectName.
+ */
+ public ThreadExecutorStatsMXBeanImpl(Executor executor, String mBeanName,
+ String mBeanType, @Nullable String mBeanCategory) {
+ super(mBeanName, mBeanType, mBeanCategory);
+
+ Preconditions.checkArgument(executor instanceof ThreadPoolExecutor,
+ "The ExecutorService of type {} is not an instanceof ThreadPoolExecutor",
+ executor.getClass());
+ this.executor = (ThreadPoolExecutor)executor;
+ }
+
+ @Override
+ public long getCurrentThreadPoolSize() {
+ return executor.getPoolSize();
+ }
+
+ @Override
+ public long getLargestThreadPoolSize() {
+ return executor.getLargestPoolSize();
+ }
+
+ @Override
+ public long getMaxThreadPoolSize() {
+ return executor.getMaximumPoolSize();
+ }
+
+ @Override
+ public long getCurrentQueueSize() {
+ return executor.getQueue().size();
+ }
+
+ @Override
+ public Long getLargestQueueSize() {
+ BlockingQueue<Runnable> queue = executor.getQueue();
+ if(queue instanceof TrackingLinkedBlockingQueue) {
+ return Long.valueOf(((TrackingLinkedBlockingQueue<?>)queue).getLargestQueueSize());
+ }
+
+ return null;
+ }
+
+ @Override
+ public long getMaxQueueSize() {
+ long queueSize = executor.getQueue().size();
+ return executor.getQueue().remainingCapacity() + queueSize;
+ }
+
+ @Override
+ public long getActiveThreadCount() {
+ return executor.getActiveCount();
+ }
+
+ @Override
+ public long getCompletedTaskCount() {
+ return executor.getCompletedTaskCount();
+ }
+
+ @Override
+ public long getTotalTaskCount() {
+ return executor.getTaskCount();
+ }
+
+ @Override
+ public Long getRejectedTaskCount() {
+ RejectedExecutionHandler rejectedHandler = executor.getRejectedExecutionHandler();
+ if(rejectedHandler instanceof CountingRejectedExecutionHandler) {
+ return Long.valueOf(((CountingRejectedExecutionHandler)rejectedHandler)
+ .getRejectedTaskCount());
+ }
+
+ return null;
+ }
+
+ /**
+ * Returns a {@link ThreadExecutorStats} instance containing a snapshot of the statistic
+ * metrics.
+ */
+ public ThreadExecutorStats toThreadExecutorStats() {
+ return new ThreadExecutorStats(getActiveThreadCount(), getCurrentThreadPoolSize(),
+ getLargestThreadPoolSize(), getMaxThreadPoolSize(), getCurrentQueueSize(),
+ getLargestQueueSize(), getMaxQueueSize(), getCompletedTaskCount(),
+ getTotalTaskCount(), getRejectedTaskCount());
+ }
+}
*/
package org.opendaylight.controller.config.yang.md.sal.dom.impl;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
public final class DomInmemoryDataBrokerModule extends
org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomInmemoryDataBrokerModule {
+ private static final String JMX_BEAN_TYPE = "DOMDataBroker";
+
public DomInmemoryDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
* nothing on success. The executor queue capacity is bounded and, if the capacity is
* reached, subsequent submitted tasks will block the caller.
*/
- Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(
+ ExecutorService listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(
getMaxDataBrokerFutureCallbackPoolSize(), getMaxDataBrokerFutureCallbackQueueSize(),
"CommitFutures");
TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION,
listenableFutureExecutor));
+ final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl(
+ newDataBroker.getCommitStatsTracker(), JMX_BEAN_TYPE);
+ commitStatsMXBean.registerMBean();
+
+ final ThreadExecutorStatsMXBeanImpl commitExecutorStatsMXBean =
+ new ThreadExecutorStatsMXBeanImpl(commitExecutor, "CommitExecutorStats",
+ JMX_BEAN_TYPE, null);
+ commitExecutorStatsMXBean.registerMBean();
+
+ final ThreadExecutorStatsMXBeanImpl commitFutureStatsMXBean =
+ new ThreadExecutorStatsMXBeanImpl(listenableFutureExecutor,
+ "CommitFutureExecutorStats", JMX_BEAN_TYPE, null);
+ commitFutureStatsMXBean.registerMBean();
+
+ newDataBroker.setCloseable(new AutoCloseable() {
+ @Override
+ public void close() {
+ commitStatsMXBean.unregisterMBean();
+ commitExecutorStatsMXBean.unregisterMBean();
+ commitFutureStatsMXBean.unregisterMBean();
+ }
+ });
+
return newDataBroker;
}
}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.DurationStatsTracker;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final DOMDataCommitCoordinatorImpl coordinator;
private final AtomicLong txNum = new AtomicLong();
private final AtomicLong chainNum = new AtomicLong();
+ private volatile AutoCloseable closeable;
public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
final ListeningExecutorService executor) {
this.coordinator = new DOMDataCommitCoordinatorImpl(executor);
}
+ public void setCloseable(AutoCloseable closeable) {
+ this.closeable = closeable;
+ }
+
+ public DurationStatsTracker getCommitStatsTracker() {
+ return coordinator.getCommitStatsTracker();
+ }
+
+ @Override
+ public void close() {
+ super.close();
+
+ if(closeable != null) {
+ try {
+ closeable.close();
+ } catch(Exception e) {
+ LOG.debug("Error closing instance", e);
+ }
+ }
+ }
+
@Override
protected Object newTransactionIdentifier() {
return "DOM-" + txNum.getAndIncrement();
LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
}
-
}
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.DurationStatsTracker;
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final ListeningExecutorService executor;
+ private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
+
/**
*
* Construct DOMDataCommitCoordinator which uses supplied executor to
this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
}
+ public DurationStatsTracker getCommitStatsTracker() {
+ return commitStatsTracker;
+ }
+
@Override
public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
ListenableFuture<Void> commitFuture = null;
try {
- commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener));
+ commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
+ listener, commitStatsTracker));
} catch(RejectedExecutionException e) {
LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
executor, e);
private final DOMDataWriteTransaction tx;
private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
+ private final DurationStatsTracker commitStatTracker;
@GuardedBy("this")
private CommitPhase currentPhase;
public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
- final Optional<DOMDataCommitErrorListener> listener) {
+ final Optional<DOMDataCommitErrorListener> listener,
+ final DurationStatsTracker commitStatTracker) {
this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
this.currentPhase = CommitPhase.SUBMITTED;
+ this.commitStatTracker = commitStatTracker;
}
@Override
public Void call() throws TransactionCommitFailedException {
+ long startTime = System.nanoTime();
try {
canCommitBlocking();
preCommitBlocking();
LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
abortBlocking(e);
throw e;
+ } finally {
+ if(commitStatTracker != null) {
+ commitStatTracker.addDuration(System.nanoTime() - startTime);
+ }
}
}
}
}
+ @Override
+ public String toString() {
+ return getDelegate().getClass().getName();
+ }
+
static final class TranslatingConfigListenerInvoker extends TranslatingListenerInvoker {
public TranslatingConfigListenerInvoker(final DataChangeListener listener, final DataNormalizer normalizer) {
super(listener, normalizer);
}
+ @Override
DataChangeEvent<YangInstanceIdentifier, CompositeNode> getLegacyEvent(final DataNormalizer normalizer, final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedChange) {
return TranslatingDataChangeEvent.createConfiguration(normalizedChange, normalizer);
}
super(listener, normalizer);
}
+ @Override
DataChangeEvent<YangInstanceIdentifier, CompositeNode> getLegacyEvent(final DataNormalizer normalizer, final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedChange) {
return TranslatingDataChangeEvent.createOperational(normalizedChange, normalizer);
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.jmx;
+
+/**
+ * MXBean interface for retrieving write Tx commit statistics.
+ *
+ * @author Thomas Pantelis
+ */
+public interface CommitStatsMXBean {
+
+ /**
+ * Returns the total number of commits that have occurred.
+ */
+ long getTotalCommits();
+
+ /**
+ * Returns a string representing the time duration of the longest commit, in the appropriate
+ * scaled units, along with the date/time that it occurred.
+ */
+ String getLongestCommitTime();
+
+ /**
+ * Returns a string representing the time duration of the shortest commit, in the appropriate
+ * scaled units, along with the date/time that it occurred.
+ */
+ String getShortestCommitTime();
+
+ /**
+ * Returns a string representing average commit time duration, in the appropriate
+ * scaled units.
+ */
+ String getAverageCommitTime();
+
+ /**
+ * Clears the current stats to their defaults.
+ */
+ void clearStats();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.jmx;
+
+import javax.annotation.Nonnull;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.yangtools.util.DurationStatsTracker;
+
+/**
+ * Implementation of the CommitStatsMXBean interface.
+ *
+ * @author Thomas Pantelis
+ */
+public class CommitStatsMXBeanImpl extends AbstractMXBean implements CommitStatsMXBean {
+
+ private final DurationStatsTracker commitStatsTracker;
+
+ /**
+ * Constructor.
+ *
+ * @param commitStatsTracker the DurationStatsTracker used to obtain the stats.
+ * @param mBeanType mBeanType Used as the <code>type</code> property in the bean's ObjectName.
+ */
+ public CommitStatsMXBeanImpl(@Nonnull DurationStatsTracker commitStatsTracker,
+ @Nonnull String mBeanType) {
+ super("CommitStats", mBeanType, null);
+ this.commitStatsTracker = commitStatsTracker;
+ }
+
+ @Override
+ public long getTotalCommits() {
+ return commitStatsTracker.getTotalDurations();
+ }
+
+ @Override
+ public String getLongestCommitTime() {
+ return commitStatsTracker.getDisplayableLongestDuration();
+ }
+
+ @Override
+ public String getShortestCommitTime() {
+ return commitStatsTracker.getDisplayableShortestDuration();
+ }
+
+ @Override
+ public String getAverageCommitTime() {
+ return commitStatsTracker.getDisplayableAverageDuration();
+ }
+
+ @Override
+ public void clearStats() {
+ commitStatsTracker.reset();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.broker.impl.jmx;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.opendaylight.yangtools.util.DurationStatsTracker;
+
+/**
+ * Unit tests for CommitStatsMXBeanImpl.
+ *
+ * @author Thomas Pantelis
+ */
+public class CommitStatsMXBeanImplTest {
+
+ @Test
+ public void test() {
+
+ DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
+ CommitStatsMXBeanImpl bean =
+ new CommitStatsMXBeanImpl(commitStatsTracker, "Test");
+
+ commitStatsTracker.addDuration(100);
+
+ String prefix = "100.0 ns";
+ assertEquals("getTotalCommits", 1L, bean.getTotalCommits());
+ assertEquals("getLongestCommitTime starts with \"" + prefix + "\"", true,
+ bean.getLongestCommitTime().startsWith("100.0 ns"));
+ assertEquals("getShortestCommitTime starts with \"" + prefix + "\"", true,
+ bean.getShortestCommitTime().startsWith(prefix));
+ assertEquals("getAverageCommitTime starts with \"" + prefix + "\"", true,
+ bean.getAverageCommitTime().startsWith(prefix));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.core.spi.data.statistics;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nonnull;
+
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+
+/**
+ * Interface for a class that tracks statistics for a data store.
+ *
+ * @author Thomas Pantelis
+ */
+public interface DOMStoreStatsTracker {
+
+ /**
+ * Sets the executor used for DataChangeListener notifications.
+ *
+ * @param dclExecutor the executor
+ */
+ void setDataChangeListenerExecutor( @Nonnull ExecutorService dclExecutor );
+
+ /**
+ * Sets the executor used internally by the data store.
+ *
+ * @param dsExecutor the executor
+ */
+ void setDataStoreExecutor( @Nonnull ExecutorService dsExecutor );
+
+ /**
+ * Sets the QueuedNotificationManager use for DataChangeListener notifications,
+ *
+ * @param manager the manager
+ */
+ void setNotificationManager( @Nonnull QueuedNotificationManager<?, ?> manager );
+}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.jmx.InMemoryDataStoreStats;
public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency(),
+
+ InMemoryDOMDataStore dataStore = InMemoryDOMDataStoreFactory.create(
+ "DOM-CFG", getSchemaServiceDependency(),
InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
- getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize()));
+ getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
+ getMaxDataStoreExecutorQueueSize()));
+
+ InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryConfigDataStore",
+ dataStore.getDataChangeListenerNotificationManager(), dataStore.getDomStoreExecutor());
+
+ dataStore.setCloseable(statsBean);
+
+ return dataStore;
}
}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.jmx.InMemoryDataStoreStats;
public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return InMemoryDOMDataStoreFactory.create("DOM-OPER", getSchemaServiceDependency(),
+ InMemoryDOMDataStore dataStore = InMemoryDOMDataStoreFactory.create("DOM-OPER", getSchemaServiceDependency(),
InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
- getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize()));
- }
+ getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
+ getMaxDataStoreExecutorQueueSize()));
+
+
+ InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryOperationalDataStore",
+ dataStore.getDataChangeListenerNotificationManager(), dataStore.getDomStoreExecutor());
+ dataStore.setCloseable(statsBean);
+
+ return dataStore;
+ }
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.ExecutorServiceUtil;
-import org.opendaylight.yangtools.util.concurrent.NotificationManager;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final AtomicLong txCounter = new AtomicLong(0);
private final ListeningExecutorService listeningExecutor;
- private final NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
+ private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
private final ExecutorService dataChangeListenerExecutor;
+ private final ExecutorService domStoreExecutor;
+
private final String name;
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
+ private volatile AutoCloseable closeable;
+
+ public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
final ExecutorService dataChangeListenerExecutor) {
- this(name, listeningExecutor, dataChangeListenerExecutor,
- InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
+ this(name, domStoreExecutor, dataChangeListenerExecutor,
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
}
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
+ public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize) {
this.name = Preconditions.checkNotNull(name);
- this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
+ this.domStoreExecutor = Preconditions.checkNotNull(domStoreExecutor);
+ this.listeningExecutor = MoreExecutors.listeningDecorator(this.domStoreExecutor);
this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
dataChangeListenerNotificationManager =
"DataChangeListenerQueueMgr");
}
+ public void setCloseable(AutoCloseable closeable) {
+ this.closeable = closeable;
+ }
+
+ public QueuedNotificationManager<?, ?> getDataChangeListenerNotificationManager() {
+ return dataChangeListenerNotificationManager;
+ }
+
+ public ExecutorService getDomStoreExecutor() {
+ return domStoreExecutor;
+ }
+
@Override
public final String getIdentifier() {
return name;
public void close() {
ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
+
+ if(closeable != null) {
+ try {
+ closeable.close();
+ } catch(Exception e) {
+ LOG.debug("Error closing instance", e);
+ }
+ }
}
+
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
public static final int DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE = 1000;
public static final int DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE = 20;
public static final int DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE = 1000;
+ public static final int DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE = 5000;
private static final InMemoryDOMDataStoreConfigProperties DEFAULT =
create(DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE,
DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE,
- DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
+ DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE,
+ DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE);
private final int maxDataChangeExecutorQueueSize;
private final int maxDataChangeExecutorPoolSize;
private final int maxDataChangeListenerQueueSize;
+ private final int maxDataStoreExecutorQueueSize;
/**
* Constructs an instance with the given property values.
* maximum queue size for the data change notification executor.
* @param maxDataChangeListenerQueueSize
* maximum queue size for the data change listeners.
+ * @param maxDataStoreExecutorQueueSize
+ * maximum queue size for the data store executor.
*/
+ public static InMemoryDOMDataStoreConfigProperties create(int maxDataChangeExecutorPoolSize,
+ int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize,
+ int maxDataStoreExecutorQueueSize) {
+ return new InMemoryDOMDataStoreConfigProperties(maxDataChangeExecutorPoolSize,
+ maxDataChangeExecutorQueueSize, maxDataChangeListenerQueueSize,
+ maxDataStoreExecutorQueueSize);
+ }
+
public static InMemoryDOMDataStoreConfigProperties create(int maxDataChangeExecutorPoolSize,
int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize) {
return new InMemoryDOMDataStoreConfigProperties(maxDataChangeExecutorPoolSize,
- maxDataChangeExecutorQueueSize, maxDataChangeListenerQueueSize);
+ maxDataChangeExecutorQueueSize, maxDataChangeListenerQueueSize,
+ DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE);
}
/**
}
private InMemoryDOMDataStoreConfigProperties(int maxDataChangeExecutorPoolSize,
- int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize) {
+ int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize,
+ int maxDataStoreExecutorQueueSize) {
this.maxDataChangeExecutorQueueSize = maxDataChangeExecutorQueueSize;
this.maxDataChangeExecutorPoolSize = maxDataChangeExecutorPoolSize;
this.maxDataChangeListenerQueueSize = maxDataChangeListenerQueueSize;
+ this.maxDataStoreExecutorQueueSize = maxDataStoreExecutorQueueSize;
}
/**
public int getMaxDataChangeListenerQueueSize() {
return maxDataChangeListenerQueueSize;
}
+
+ /**
+ * Returns the maximum queue size for the data store executor.
+ */
+ public int getMaxDataStoreExecutorQueueSize() {
+ return maxDataStoreExecutorQueueSize;
+ }
}
package org.opendaylight.controller.md.sal.dom.store.impl;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import com.google.common.util.concurrent.MoreExecutors;
/**
* A factory for creating InMemoryDOMDataStore instances.
ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
+ ExecutorService domStoreExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
+ actualProperties.getMaxDataStoreExecutorQueueSize(), "DOMStore-" + name );
+
InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
- MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()),
- dataChangeListenerExecutor, actualProperties.getMaxDataChangeListenerQueueSize());
+ domStoreExecutor, dataChangeListenerExecutor,
+ actualProperties.getMaxDataChangeListenerQueueSize());
if(schemaService != null) {
schemaService.registerSchemaContextListener(dataStore);
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.store.impl.jmx;
+
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+
+/**
+ * Wrapper class for data store MXbeans.
+ *
+ * @author Thomas Pantelis
+ */
+public class InMemoryDataStoreStats implements AutoCloseable {
+
+ private final ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
+ private final ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
+ private final QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
+
+ public InMemoryDataStoreStats(String mBeanType, QueuedNotificationManager<?, ?> manager,
+ ExecutorService dataStoreExecutor) {
+
+ this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
+ "notification-manager", mBeanType, null);
+ notificationManagerStatsBean.registerMBean();
+
+ this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+ "notification-executor", mBeanType, null);
+ this.notificationExecutorStatsBean.registerMBean();
+
+ this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dataStoreExecutor,
+ "data-store-executor", mBeanType, null);
+ this.dataStoreExecutorStatsBean.registerMBean();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if(notificationExecutorStatsBean != null) {
+ notificationExecutorStatsBean.unregisterMBean();
+ }
+
+ if(dataStoreExecutorStatsBean != null) {
+ dataStoreExecutorStatsBean.unregisterMBean();
+ }
+
+ if(notificationManagerStatsBean != null) {
+ notificationManagerStatsBean.unregisterMBean();
+ }
+ }
+}
type uint16;
description "The maximum queue size for the data change listeners.";
}
+
+ leaf max-data-store-executor-queue-size {
+ default 5000;
+ type uint16;
+ description "The maximum queue size for the data store executor.";
+ }
}
// Augments the 'configuration' choice node under modules/module.