private final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
private MeteredMessageQueue queue;
- private Integer capacity;
- private FiniteDuration pushTimeOut;
- private MetricRegistry registry;
+ private final Integer capacity;
+ private final FiniteDuration pushTimeOut;
+ private final MetricRegistry registry;
private final String QUEUE_SIZE = "q-size";
this.capacity = commonConfig.getMailBoxCapacity();
this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
- MetricsReporter reporter = MetricsReporter.getInstance();
+ MetricsReporter reporter = MetricsReporter.getInstance(MeteringBehavior.DOMAIN);
registry = reporter.getMetricsRegistry();
}
String metricName = MetricRegistry.name(actorName, QUEUE_SIZE);
if (registry.getMetrics().containsKey(metricName))
+ {
return; //already registered
+ }
Gauge<Integer> queueSize = getQueueSizeGuage(monitoredQueue);
registerQueueSizeMetric(metricName, queueSize);
* The information is reported to {@link org.opendaylight.controller.cluster.reporting.MetricsReporter}
*/
public class MeteringBehavior implements Procedure<Object> {
+ public static final String DOMAIN = "org.opendaylight.controller.actor.metric";
private final UntypedActor meteredActor;
- private final MetricRegistry METRICREGISTRY = MetricsReporter.getInstance().getMetricsRegistry();
+ private final MetricRegistry METRICREGISTRY = MetricsReporter.getInstance(DOMAIN).getMetricsRegistry();
private final String MSG_PROCESSING_RATE = "msg-rate";
private String actorQualifiedName;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
/**
* Maintains metrics registry that is provided to reporters.
*/
public class MetricsReporter implements AutoCloseable {
- private static final MetricRegistry METRICS_REGISTRY = new MetricRegistry();
- private static final String DOMAIN = "org.opendaylight.controller.actor.metric";
- private static final MetricsReporter INSTANCE = new MetricsReporter();
-
- private final JmxReporter jmxReporter = JmxReporter.forRegistry(METRICS_REGISTRY).inDomain(DOMAIN).build();
-
- private MetricsReporter() {
+ private static LoadingCache<String, MetricsReporter> METRIC_REPORTERS = CacheBuilder.newBuilder().build(
+ new CacheLoader<String, MetricsReporter>() {
+ @Override
+ public MetricsReporter load(String domainName) {
+ return new MetricsReporter(domainName);
+ }
+ });
+
+ private final String domainName;
+ private final JmxReporter jmxReporter;
+ private final MetricRegistry metricRegistry = new MetricRegistry();
+
+ private MetricsReporter(String domainName) {
+ this.domainName = domainName;
+ jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(domainName).build();
jmxReporter.start();
}
- public static MetricsReporter getInstance() {
- return INSTANCE;
+ public static MetricsReporter getInstance(String domainName) {
+ return METRIC_REPORTERS.getUnchecked(domainName);
}
public MetricRegistry getMetricsRegistry() {
- return METRICS_REGISTRY;
+ return metricRegistry;
}
@Override
public void close() {
jmxReporter.close();
+
+ METRIC_REPORTERS.invalidate(domainName);
}
}
package org.opendaylight.controller.cluster.datastore;
import akka.util.Timeout;
+import com.google.common.collect.Sets;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.text.WordUtils;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
* @author Thomas Pantelis
*/
public class DatastoreContext {
+ public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
public static final int DEFAULT_OPERATION_TIMEOUT_IN_SECONDS = 5;
public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100;
+ private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
+
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
private int operationTimeoutInSeconds = DEFAULT_OPERATION_TIMEOUT_IN_SECONDS;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private boolean writeOnlyTransactionOptimizationsEnabled = false;
+ public static Set<String> getGlobalDatastoreTypes() {
+ return globalDatastoreTypes;
+ }
+
private DatastoreContext() {
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
+
+ if(datastoreContext.dataStoreType != null) {
+ globalDatastoreTypes.add(datastoreContext.dataStoreType);
+ }
+
return datastoreContext;
}
}
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
- private CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
+ private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
+
+ private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
private final String type;
datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType());
datastoreConfigMXBean.setContext(datastoreContext);
datastoreConfigMXBean.registerMBean();
+
+ datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContext.getDataStoreMXBeanType(), actorContext);
+ datastoreInfoMXBean.registerMBean();
}
public DistributedDataStore(ActorContext actorContext) {
@Override
public void close() {
datastoreConfigMXBean.unregisterMBean();
+ datastoreInfoMXBean.unregisterMBean();
if(closeable != null) {
try {
if(txFutureCallbackMap.size() == 0) {
onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
+ TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
}
Preconditions.checkState(timerContext != null, "Call run before success");
timerContext.stop();
+ double newRateLimit = calculateNewRateLimit(commitTimer, actorContext.getDatastoreContext());
+
+ LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit);
+
+ actorContext.setTxCreationLimit(newRateLimit);
+ }
+
+ @Override
+ public void failure() {
+ // This would mean we couldn't get a transaction completed in 30 seconds which is
+ // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+ // not going to be useful - so we leave it as it is
+ }
+
+ private static double calculateNewRateLimit(Timer commitTimer, DatastoreContext context) {
+ if(commitTimer == null) {
+ // This can happen in unit tests.
+ return 0;
+ }
+
Snapshot timerSnapshot = commitTimer.getSnapshot();
double newRateLimit = 0;
- long commitTimeoutInSeconds = actorContext.getDatastoreContext()
- .getShardTransactionCommitTimeoutInSeconds();
+ long commitTimeoutInSeconds = context.getShardTransactionCommitTimeoutInSeconds();
long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
// Find the time that it takes for transactions to get executed in every 10th percentile
if(percentileTimeInNanos > 0) {
// Figure out the rate limit for the i*10th percentile in nanos
- double percentileRateLimit = ((double) commitTimeoutInNanos / percentileTimeInNanos);
+ double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos);
// Add the percentileRateLimit to the total rate limit
newRateLimit += percentileRateLimit;
}
// Compute the rate limit per second
- newRateLimit = newRateLimit/(commitTimeoutInSeconds*10);
-
- LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit);
-
- actorContext.setTxCreationLimit(newRateLimit);
+ return newRateLimit/(commitTimeoutInSeconds*10);
}
- @Override
- public void failure() {
- // This would mean we couldn't get a transaction completed in 30 seconds which is
- // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
- // not going to be useful - so we leave it as it is
+ public static void adjustRateLimitForUnusedTransaction(ActorContext actorContext) {
+ // Unused transactions in one data store can artificially limit the rate for other data stores
+ // if the first data store's rate is still at a lower initial rate since the front-end creates
+ // transactions in each data store up-front even though the client may not actually submit changes.
+ // So we may have to adjust the rate for data stores with unused transactions.
+
+ // First calculate the current rate for the data store. If it's 0 then there have been no
+ // actual transactions committed to the data store.
+
+ double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(COMMIT),
+ actorContext.getDatastoreContext());
+ if(newRateLimit == 0.0) {
+ // Since we have no rate data for unused Tx's data store, adjust to the rate from another
+ // data store that does have rate data.
+ for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) {
+ if(datastoreType.equals(actorContext.getDataStoreType())) {
+ continue;
+ }
+
+ newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, COMMIT),
+ actorContext.getDatastoreContext());
+ if(newRateLimit > 0.0) {
+ LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}",
+ actorContext.getDataStoreType(), newRateLimit);
+
+ actorContext.setTxCreationLimit(newRateLimit);
+ break;
+ }
+ }
+ }
}
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
+
+/**
+ * JMX bean for general datastore info.
+ *
+ * @author Thomas Pantelis
+ */
+public interface DatastoreInfoMXBean {
+ double getTransactionCreationRateLimit();
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
+
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+
+/**
+ * Implementation of DatastoreInfoMXBean.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreInfoMXBeanImpl extends AbstractMXBean implements DatastoreInfoMXBean {
+
+ private final ActorContext actorContext;
+
+ public DatastoreInfoMXBeanImpl(String mxBeanType, ActorContext actorContext) {
+ super("GeneralRuntimeInfo", mxBeanType, null);
+ this.actorContext = actorContext;
+ }
+
+
+ @Override
+ public double getTransactionCreationRateLimit() {
+ return actorContext.getTxCreationLimit();
+ }
+}
import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
-import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*/
public class ActorContext {
private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
- private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
private static final String METRIC_RATE = "rate";
- private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
new Mapper<Throwable, Throwable>() {
@Override
private Timeout operationTimeout;
private final String selfAddressHostPort;
private RateLimiter txRateLimiter;
- private final MetricRegistry metricRegistry = new MetricRegistry();
- private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
+ private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
}
transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
- jmxReporter.start();
-
}
private void setCachedProperties() {
* @return
*/
public Timer getOperationTimer(String operationName){
- final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+ return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
+ }
+
+ public Timer getOperationTimer(String dataStoreType, String operationName){
+ final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
+ operationName, METRIC_RATE);
return metricRegistry.timer(rate);
}
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
}
+ @Test
+ public void testAdjustRateLimitForUnusedTransaction() {
+ doReturn(commitTimer).when(actorContext).getOperationTimer("one", "commit");
+ doReturn("one").when(actorContext).getDataStoreType();
+
+ Timer commitTimer2 = Mockito.mock(Timer.class);
+ Snapshot commitSnapshot2 = Mockito.mock(Snapshot.class);
+
+ doReturn(commitSnapshot2).when(commitTimer2).getSnapshot();
+
+ doReturn(commitTimer2).when(actorContext).getOperationTimer("two", "commit");
+
+ DatastoreContext.newBuilder().dataStoreType("one").build();
+ DatastoreContext.newBuilder().dataStoreType("two").build();
+
+ doReturn(TimeUnit.MICROSECONDS.toNanos(500) * 1D).when(commitSnapshot).getValue(1 * 0.1);
+
+ TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
+
+ verify(actorContext, never()).setTxCreationLimit(anyDouble());
+
+ Mockito.reset(commitSnapshot);
+
+ TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
+
+ verify(actorContext, never()).setTxCreationLimit(anyDouble());
+
+ System.out.println(""+TimeUnit.SECONDS.toNanos(30)/TimeUnit.MICROSECONDS.toNanos(100));
+
+ doReturn(TimeUnit.MICROSECONDS.toNanos(100) * 1D).when(commitSnapshot2).getValue(1 * 0.1);
+
+ TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
+
+ verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(1000)));
+ }
+
public Matcher<Double> approximately(final double val){
return new BaseMatcher<Double>() {
@Override
public boolean matches(Object o) {
Double aDouble = (Double) o;
- return aDouble > val && aDouble < val+1;
+ return aDouble >= val && aDouble <= val+1;
}
@Override