From 2a01b2263488748bd07d224a01b23f5550274447 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Wed, 4 Feb 2015 18:19:14 -0800 Subject: [PATCH] BUG 2676 : Apply backpressure when creating transactions This patch uses a rate limiter to limit the number of write transactions that can be created on the datastore at any given point of time (within a second). We also have added a metric that is trackable using JMX which will show exactly how much time we take in committing a transaction. Could this rate limiter have been on the DataBroker? Yes it could have been but I think we need to go granular on this one. Ideally the ratelimiter would be for every shard transaction but it's not trivial to do in a way that would actually block the consumer for now - this is because creating a transaction on Shard itself is an async operation. Change-Id: I3d8e3baeb892ba494c0ab0d13a0d6226c1516511 Signed-off-by: Moiz Raja --- .../cluster/datastore/DatastoreContext.java | 18 ++- .../datastore/DistributedDataStore.java | 4 +- .../ThreePhaseCommitCohortProxy.java | 104 ++++++++++++++++-- .../datastore/TransactionChainProxy.java | 2 + .../cluster/datastore/utils/ActorContext.java | 91 +++++++++++++-- ...tributedConfigDataStoreProviderModule.java | 1 + ...tedOperationalDataStoreProviderModule.java | 1 + .../yang/distributed-datastore-provider.yang | 8 ++ .../datastore/DistributedDataStoreTest.java | 60 ++++++++++ .../ThreePhaseCommitCohortProxyTest.java | 51 ++++++++- .../datastore/TransactionChainProxyTest.java | 38 +++++++ .../datastore/utils/ActorContextTest.java | 33 ++++++ 12 files changed, 387 insertions(+), 24 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 01e42dbb8e..d2e8b0efd8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -37,13 +37,15 @@ public class DatastoreContext { private final boolean persistent; private final ConfigurationReader configurationReader; private final long shardElectionTimeoutFactor; + private final long transactionCreationInitialRateLimit; private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties, ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds, Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds, int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout, Timeout shardLeaderElectionTimeout, - boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor) { + boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor, + long transactionCreationInitialRateLimit) { this.dataStoreProperties = dataStoreProperties; this.shardRaftConfig = shardRaftConfig; this.dataStoreMXBeanType = dataStoreMXBeanType; @@ -56,6 +58,7 @@ public class DatastoreContext { this.persistent = persistent; this.configurationReader = configurationReader; this.shardElectionTimeoutFactor = shardElectionTimeoutFactor; + this.transactionCreationInitialRateLimit = transactionCreationInitialRateLimit; } public static Builder newBuilder() { @@ -110,6 +113,11 @@ public class DatastoreContext { return this.shardElectionTimeoutFactor; } + + public long getTransactionCreationInitialRateLimit() { + return transactionCreationInitialRateLimit; + } + public static class Builder { private InMemoryDOMDataStoreConfigProperties dataStoreProperties; private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES); @@ -127,6 +135,7 @@ public class DatastoreContext { private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10; private int shardSnapshotDataThresholdPercentage = 12; private long shardElectionTimeoutFactor = 2; + private long transactionCreationInitialRateLimit = 100; public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) { this.shardTransactionIdleTimeout = shardTransactionIdleTimeout; @@ -209,6 +218,11 @@ public class DatastoreContext { return this; } + public Builder transactionCreationInitialRateLimit(long initialRateLimit){ + this.transactionCreationInitialRateLimit = initialRateLimit; + return this; + } + public DatastoreContext build() { DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); @@ -225,7 +239,7 @@ public class DatastoreContext { operationTimeoutInSeconds, shardTransactionIdleTimeout, shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity, shardInitializationTimeout, shardLeaderElectionTimeout, - persistent, configurationReader, shardElectionTimeoutFactor); + persistent, configurationReader, shardElectionTimeoutFactor, transactionCreationInitialRateLimit); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 930c5f7257..e9038ed4f0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -54,7 +54,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au actorContext = new ActorContext(actorSystem, actorSystem.actorOf( ShardManager.props(type, cluster, configuration, datastoreContext) .withMailbox(ActorContext.MAILBOX), shardManagerId ), - cluster, configuration, datastoreContext); + cluster, configuration, datastoreContext, type); } public DistributedDataStore(ActorContext actorContext) { @@ -94,11 +94,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { + actorContext.acquireTxCreationPermit(); return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { + actorContext.acquireTxCreationPermit(); return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 932c36fe34..4f472266c1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.Futures; import akka.dispatch.OnComplete; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; @@ -44,6 +47,19 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho private final List> cohortFutures; private volatile List cohorts; private final String transactionId; + private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() { + @Override + public void run() { + } + + @Override + public void success() { + } + + @Override + public void failure() { + } + }; public ThreePhaseCommitCohortProxy(ActorContext actorContext, List> cohortFutures, String transactionId) { @@ -151,8 +167,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho if(LOG.isDebugEnabled()) { LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort); } - - futureList.add(actorContext.executeOperationAsync(cohort, message)); + futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout())); } return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher()); @@ -179,12 +194,20 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho @Override public ListenableFuture commit() { - return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(), - CommitTransactionReply.SERIALIZABLE_CLASS, true); + OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK : + new CommitCallback(actorContext); + + return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(), + CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback); + } + + private ListenableFuture voidOperation(final String operationName, final Object message, + final Class expectedResponseClass, final boolean propagateException) { + return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK); } private ListenableFuture voidOperation(final String operationName, final Object message, - final Class expectedResponseClass, final boolean propagateException) { + final Class expectedResponseClass, final boolean propagateException, final OperationCallback callback) { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} {}", transactionId, operationName); @@ -196,7 +219,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho if(cohorts != null) { finishVoidOperation(operationName, message, expectedResponseClass, propagateException, - returnFuture); + returnFuture, callback); } else { buildCohortList().onComplete(new OnComplete() { @Override @@ -213,7 +236,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } } else { finishVoidOperation(operationName, message, expectedResponseClass, - propagateException, returnFuture); + propagateException, returnFuture, callback); } } }, actorContext.getActorSystem().dispatcher()); @@ -223,11 +246,14 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } private void finishVoidOperation(final String operationName, final Object message, - final Class expectedResponseClass, final boolean propagateException, - final SettableFuture returnFuture) { + final Class expectedResponseClass, final boolean propagateException, + final SettableFuture returnFuture, final OperationCallback callback) { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} finish {}", transactionId, operationName); } + + callback.run(); + Future> combinedFuture = invokeCohorts(message); combinedFuture.onComplete(new OnComplete>() { @@ -247,6 +273,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } if(exceptionToPropagate != null) { + if(LOG.isDebugEnabled()) { LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId, operationName, exceptionToPropagate); @@ -265,11 +292,16 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } returnFuture.set(null); } + + callback.failure(); } else { + if(LOG.isDebugEnabled()) { LOG.debug("Tx {}: {} succeeded", transactionId, operationName); } returnFuture.set(null); + + callback.success(); } } }, actorContext.getActorSystem().dispatcher()); @@ -279,4 +311,58 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho List> getCohortFutures() { return Collections.unmodifiableList(cohortFutures); } + + private static interface OperationCallback { + void run(); + void success(); + void failure(); + } + + private static class CommitCallback implements OperationCallback{ + + private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class); + private static final String COMMIT = "commit"; + + private final Timer commitTimer; + private final ActorContext actorContext; + private Timer.Context timerContext; + + CommitCallback(ActorContext actorContext){ + this.actorContext = actorContext; + commitTimer = actorContext.getOperationTimer(COMMIT); + } + + @Override + public void run() { + timerContext = commitTimer.time(); + } + + @Override + public void success() { + timerContext.stop(); + + Snapshot timerSnapshot = commitTimer.getSnapshot(); + double allowedLatencyInNanos = timerSnapshot.get98thPercentile(); + + long commitTimeoutInSeconds = actorContext.getDatastoreContext() + .getShardTransactionCommitTimeoutInSeconds(); + long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds); + + // Here we are trying to find out how many transactions per second are allowed + double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds; + + LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}", + actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos); + + actorContext.setTxCreationLimit(newRateLimit); + } + + @Override + public void failure() { + // This would mean we couldn't get a transaction completed in 30 seconds which is + // the default transaction commit timeout. Using the timeout information to figure out the rate limit is + // not going to be useful - so we leave it as it is + } + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 87959efe8a..ee3a5cc825 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -104,11 +104,13 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { + actorContext.acquireTxCreationPermit(); return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { + actorContext.acquireTxCreationPermit(); return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index c9fdf38931..7138a4a6e7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -18,9 +18,13 @@ import akka.actor.PoisonPill; import akka.dispatch.Mapper; import akka.pattern.AskTimeoutException; import akka.util.Timeout; +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -54,11 +58,11 @@ import scala.concurrent.duration.FiniteDuration; * but should not be passed to actors especially remote actors */ public class ActorContext { - private static final Logger - LOG = LoggerFactory.getLogger(ActorContext.class); - - public static final String MAILBOX = "bounded-mailbox"; - + private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class); + private static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; + private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store"; + private static final String METRIC_RATE = "rate"; + private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore"; private static final Mapper FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper() { @Override @@ -74,36 +78,47 @@ public class ActorContext { return actualFailure; } }; + public static final String MAILBOX = "bounded-mailbox"; private final ActorSystem actorSystem; private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; private final Configuration configuration; private final DatastoreContext datastoreContext; - private volatile SchemaContext schemaContext; + private final String dataStoreType; private final FiniteDuration operationDuration; private final Timeout operationTimeout; private final String selfAddressHostPort; + private final RateLimiter txRateLimiter; + private final MetricRegistry metricRegistry = new MetricRegistry(); + private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; + private final Timeout transactionCommitOperationTimeout; + + private volatile SchemaContext schemaContext; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { this(actorSystem, shardManager, clusterWrapper, configuration, - DatastoreContext.newBuilder().build()); + DatastoreContext.newBuilder().build(), UNKNOWN_DATA_STORE_TYPE); } public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, String dataStoreType) { this.actorSystem = actorSystem; this.shardManager = shardManager; this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; + this.dataStoreType = dataStoreType; + this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); - operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), - TimeUnit.SECONDS); + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); operationTimeout = new Timeout(operationDuration); + transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), + TimeUnit.SECONDS)); + Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -113,6 +128,7 @@ public class ActorContext { } transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); + jmxReporter.start(); } public DatastoreContext getDatastoreContext() { @@ -446,4 +462,59 @@ public class ActorContext { public int getTransactionOutstandingOperationLimit(){ return transactionOutstandingOperationLimit; } + + /** + * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow + * us to create a timer for pretty much anything. + * + * @param operationName + * @return + */ + public Timer getOperationTimer(String operationName){ + final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName, METRIC_RATE); + return metricRegistry.timer(rate); + } + + /** + * Get the type of the data store to which this ActorContext belongs + * + * @return + */ + public String getDataStoreType() { + return dataStoreType; + } + + /** + * Set the number of transaction creation permits that are to be allowed + * + * @param permitsPerSecond + */ + public void setTxCreationLimit(double permitsPerSecond){ + txRateLimiter.setRate(permitsPerSecond); + } + + /** + * Get the current transaction creation rate limit + * @return + */ + public double getTxCreationLimit(){ + return txRateLimiter.getRate(); + } + + /** + * Try to acquire a transaction creation permit. Will block if no permits are available. + */ + public void acquireTxCreationPermit(){ + txRateLimiter.acquire(); + } + + /** + * Return the operation timeout to be used when committing transactions + * @return + */ + public Timeout getTransactionCommitOperationTimeout(){ + return transactionCommitOperationTimeout; + } + + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index 711c6a37b5..253422122d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -67,6 +67,7 @@ public class DistributedConfigDataStoreProviderModule extends .shardIsolatedLeaderCheckIntervalInMillis( props.getShardIsolatedLeaderCheckIntervalInMillis().getValue()) .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue()) + .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue()) .build(); return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index d9df06df1c..d41b83d0d2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -67,6 +67,7 @@ public class DistributedOperationalDataStoreProviderModule extends .shardIsolatedLeaderCheckIntervalInMillis( props.getShardIsolatedLeaderCheckIntervalInMillis().getValue()) .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue()) + .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue()) .build(); return DistributedDataStoreFactory.createInstance("operational", diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index 46cd50d0c1..e2ee7373d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -180,6 +180,14 @@ module distributed-datastore-provider { description "The interval at which the leader of the shard will check if its majority followers are active and term itself as isolated"; } + + leaf tx-creation-initial-rate-limit { + default 100; + type non-zero-uint32-type; + description "The initial number of transactions per second that are allowed before the data store + should begin applying back pressure. This number is only used as an initial guidance, + subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit"; + } } // Augments the 'configuration' choice node under modules/module. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java new file mode 100644 index 0000000000..66fa876277 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -0,0 +1,60 @@ +package org.opendaylight.controller.cluster.datastore; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public class DistributedDataStoreTest extends AbstractActorTest { + + private SchemaContext schemaContext; + + @Mock + private ActorContext actorContext; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + + schemaContext = TestModel.createTestContext(); + + doReturn(schemaContext).when(actorContext).getSchemaContext(); + } + + @Test + public void testRateLimitingUsedInReadWriteTxCreation(){ + DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext); + + distributedDataStore.newReadWriteTransaction(); + + verify(actorContext, times(1)).acquireTxCreationPermit(); + } + + @Test + public void testRateLimitingUsedInWriteOnlyTxCreation(){ + DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext); + + distributedDataStore.newWriteOnlyTransaction(); + + verify(actorContext, times(1)).acquireTxCreationPermit(); + } + + + @Test + public void testRateLimitingNotUsedInReadOnlyTxCreation(){ + DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext); + + distributedDataStore.newReadOnlyTransaction(); + distributedDataStore.newReadOnlyTransaction(); + distributedDataStore.newReadOnlyTransaction(); + + verify(actorContext, times(0)).acquireTxCreationPermit(); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 75c93dd5d2..d2396e0524 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -3,14 +3,20 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import akka.actor.ActorPath; import akka.actor.ActorSelection; import akka.actor.Props; import akka.dispatch.Futures; +import akka.util.Timeout; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import java.util.List; @@ -43,11 +49,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @Mock private ActorContext actorContext; + @Mock + private DatastoreContext datastoreContext; + + @Mock + private Timer commitTimer; + + @Mock + private Timer.Context commitTimerContext; + + @Mock + private Snapshot commitSnapshot; + @Before public void setUp() { MockitoAnnotations.initMocks(this); doReturn(getSystem()).when(actorContext).getActorSystem(); + doReturn(datastoreContext).when(actorContext).getDatastoreContext(); + doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds(); + doReturn(commitTimer).when(actorContext).getOperationTimer("commit"); + doReturn(commitTimerContext).when(commitTimer).time(); + doReturn(commitSnapshot).when(commitTimer).getSnapshot(); + doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile(); + doReturn(10.0).when(actorContext).getTxCreationLimit(); } private Future newCohort() { @@ -86,12 +111,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { } stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), - isA(requestType)); + isA(requestType), any(Timeout.class)); } private void verifyCohortInvocations(int nCohorts, Class requestType) { verify(actorContext, times(nCohorts)).executeOperationAsync( - any(ActorSelection.class), isA(requestType)); + any(ActorSelection.class), isA(requestType), any(Timeout.class)); } private void propagateExecutionExceptionCause(ListenableFuture future) throws Throwable { @@ -276,8 +301,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { try { propagateExecutionExceptionCause(proxy.commit()); } finally { + + verify(actorContext, never()).setTxCreationLimit(anyLong()); verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS); } + } @Test @@ -294,11 +322,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), new CommitTransactionReply()); + assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + proxy.canCommit().get(5, TimeUnit.SECONDS); proxy.preCommit().get(5, TimeUnit.SECONDS); proxy.commit().get(5, TimeUnit.SECONDS); verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS); + + // Verify that the creation limit was changed to 0.5 (based on setup) + verify(actorContext, timeout(5000)).setTxCreationLimit(0.5); + } + + @Test + public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(0); + + assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + + proxy.canCommit().get(5, TimeUnit.SECONDS); + proxy.preCommit().get(5, TimeUnit.SECONDS); + proxy.commit().get(5, TimeUnit.SECONDS); + + verify(actorContext, never()).setTxCreationLimit(anyLong()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index dd37371a45..23c3a82a38 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore; import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; @@ -29,10 +32,17 @@ public class TransactionChainProxyTest extends AbstractActorTest{ ActorContext actorContext = null; SchemaContext schemaContext = mock(SchemaContext.class); + @Mock + ActorContext mockActorContext; + @Before public void setUp() { + MockitoAnnotations.initMocks(this); + actorContext = new MockActorContext(getSystem()); actorContext.setSchemaContext(schemaContext); + + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); } @SuppressWarnings("resource") @@ -76,4 +86,32 @@ public class TransactionChainProxyTest extends AbstractActorTest{ Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId()); } + + @Test + public void testRateLimitingUsedInReadWriteTxCreation(){ + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + txChainProxy.newReadWriteTransaction(); + + verify(mockActorContext, times(1)).acquireTxCreationPermit(); + } + + @Test + public void testRateLimitingUsedInWriteOnlyTxCreation(){ + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + txChainProxy.newWriteOnlyTransaction(); + + verify(mockActorContext, times(1)).acquireTxCreationPermit(); + } + + + @Test + public void testRateLimitingNotUsedInReadOnlyTxCreation(){ + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + txChainProxy.newReadOnlyTransaction(); + + verify(mockActorContext, times(0)).acquireTxCreationPermit(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index e4ab969f5c..16db2be185 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -12,10 +13,12 @@ import akka.japi.Creator; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.time.StopWatch; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; @@ -265,4 +268,34 @@ public class ActorContextTest extends AbstractActorTest{ assertEquals(expected, actual); } + @Test + public void testRateLimiting(){ + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext, "config"); + + // Check that the initial value is being picked up from DataStoreContext + assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); + + actorContext.setTxCreationLimit(1.0); + + assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15); + + + StopWatch watch = new StopWatch(); + + watch.start(); + + actorContext.acquireTxCreationPermit(); + actorContext.acquireTxCreationPermit(); + actorContext.acquireTxCreationPermit(); + + watch.stop(); + + assertTrue("did not take as much time as expected", watch.getTime() > 1000); + } } -- 2.36.6