private final boolean persistent;
private final ConfigurationReader configurationReader;
private final long shardElectionTimeoutFactor;
+ private final long transactionCreationInitialRateLimit;
private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout,
Timeout shardLeaderElectionTimeout,
- boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor) {
+ boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor,
+ long transactionCreationInitialRateLimit) {
this.dataStoreProperties = dataStoreProperties;
this.shardRaftConfig = shardRaftConfig;
this.dataStoreMXBeanType = dataStoreMXBeanType;
this.persistent = persistent;
this.configurationReader = configurationReader;
this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+ this.transactionCreationInitialRateLimit = transactionCreationInitialRateLimit;
}
public static Builder newBuilder() {
return this.shardElectionTimeoutFactor;
}
+
+ public long getTransactionCreationInitialRateLimit() {
+ return transactionCreationInitialRateLimit;
+ }
+
public static class Builder {
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
private int shardSnapshotDataThresholdPercentage = 12;
private long shardElectionTimeoutFactor = 2;
+ private long transactionCreationInitialRateLimit = 100;
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
+ public Builder transactionCreationInitialRateLimit(long initialRateLimit){
+ this.transactionCreationInitialRateLimit = initialRateLimit;
+ return this;
+ }
+
public DatastoreContext build() {
DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
operationTimeoutInSeconds, shardTransactionIdleTimeout,
shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity,
shardInitializationTimeout, shardLeaderElectionTimeout,
- persistent, configurationReader, shardElectionTimeoutFactor);
+ persistent, configurationReader, shardElectionTimeoutFactor, transactionCreationInitialRateLimit);
}
}
}
actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
ShardManager.props(type, cluster, configuration, datastoreContext)
.withMailbox(ActorContext.MAILBOX), shardManagerId ),
- cluster, configuration, datastoreContext);
+ cluster, configuration, datastoreContext, type);
}
public DistributedDataStore(ActorContext actorContext) {
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ actorContext.acquireTxCreationPermit();
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ actorContext.acquireTxCreationPermit();
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
}
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
+ private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+ @Override
+ public void run() {
+ }
+
+ @Override
+ public void success() {
+ }
+
+ @Override
+ public void failure() {
+ }
+ };
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
}
-
- futureList.add(actorContext.executeOperationAsync(cohort, message));
+ futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
}
return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
@Override
public ListenableFuture<Void> commit() {
- return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
- CommitTransactionReply.SERIALIZABLE_CLASS, true);
+ OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
+ new CommitCallback(actorContext);
+
+ return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
+ CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
+ }
+
+ private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException) {
+ return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
}
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException) {
+ final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} {}", transactionId, operationName);
if(cohorts != null) {
finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
- returnFuture);
+ returnFuture, callback);
} else {
buildCohortList().onComplete(new OnComplete<Void>() {
@Override
}
} else {
finishVoidOperation(operationName, message, expectedResponseClass,
- propagateException, returnFuture);
+ propagateException, returnFuture, callback);
}
}
}, actorContext.getActorSystem().dispatcher());
}
private void finishVoidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException,
- final SettableFuture<Void> returnFuture) {
+ final Class<?> expectedResponseClass, final boolean propagateException,
+ final SettableFuture<Void> returnFuture, final OperationCallback callback) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} finish {}", transactionId, operationName);
}
+
+ callback.run();
+
Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
}
if(exceptionToPropagate != null) {
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
operationName, exceptionToPropagate);
}
returnFuture.set(null);
}
+
+ callback.failure();
} else {
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
}
returnFuture.set(null);
+
+ callback.success();
}
}
}, actorContext.getActorSystem().dispatcher());
List<Future<ActorSelection>> getCohortFutures() {
return Collections.unmodifiableList(cohortFutures);
}
+
+ private static interface OperationCallback {
+ void run();
+ void success();
+ void failure();
+ }
+
+ private static class CommitCallback implements OperationCallback{
+
+ private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
+ private static final String COMMIT = "commit";
+
+ private final Timer commitTimer;
+ private final ActorContext actorContext;
+ private Timer.Context timerContext;
+
+ CommitCallback(ActorContext actorContext){
+ this.actorContext = actorContext;
+ commitTimer = actorContext.getOperationTimer(COMMIT);
+ }
+
+ @Override
+ public void run() {
+ timerContext = commitTimer.time();
+ }
+
+ @Override
+ public void success() {
+ timerContext.stop();
+
+ Snapshot timerSnapshot = commitTimer.getSnapshot();
+ double allowedLatencyInNanos = timerSnapshot.get98thPercentile();
+
+ long commitTimeoutInSeconds = actorContext.getDatastoreContext()
+ .getShardTransactionCommitTimeoutInSeconds();
+ long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+ // Here we are trying to find out how many transactions per second are allowed
+ double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
+
+ LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
+ actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
+
+ actorContext.setTxCreationLimit(newRateLimit);
+ }
+
+ @Override
+ public void failure() {
+ // This would mean we couldn't get a transaction completed in 30 seconds which is
+ // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+ // not going to be useful - so we leave it as it is
+ }
+ }
+
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ actorContext.acquireTxCreationPermit();
return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ actorContext.acquireTxCreationPermit();
return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
}
import akka.dispatch.Mapper;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
* but should not be passed to actors especially remote actors
*/
public class ActorContext {
- private static final Logger
- LOG = LoggerFactory.getLogger(ActorContext.class);
-
- public static final String MAILBOX = "bounded-mailbox";
-
+ private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
+ private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+ private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
+ private static final String METRIC_RATE = "rate";
+ private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
new Mapper<Throwable, Throwable>() {
@Override
return actualFailure;
}
};
+ public static final String MAILBOX = "bounded-mailbox";
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
private final DatastoreContext datastoreContext;
- private volatile SchemaContext schemaContext;
+ private final String dataStoreType;
private final FiniteDuration operationDuration;
private final Timeout operationTimeout;
private final String selfAddressHostPort;
+ private final RateLimiter txRateLimiter;
+ private final MetricRegistry metricRegistry = new MetricRegistry();
+ private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
+ private final Timeout transactionCommitOperationTimeout;
+
+ private volatile SchemaContext schemaContext;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this(actorSystem, shardManager, clusterWrapper, configuration,
- DatastoreContext.newBuilder().build());
+ DatastoreContext.newBuilder().build(), UNKNOWN_DATA_STORE_TYPE);
}
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration,
- DatastoreContext datastoreContext) {
+ DatastoreContext datastoreContext, String dataStoreType) {
this.actorSystem = actorSystem;
this.shardManager = shardManager;
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
+ this.dataStoreType = dataStoreType;
+ this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
- TimeUnit.SECONDS);
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
operationTimeout = new Timeout(operationDuration);
+ transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+ TimeUnit.SECONDS));
+
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
}
transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
+ jmxReporter.start();
}
public DatastoreContext getDatastoreContext() {
public int getTransactionOutstandingOperationLimit(){
return transactionOutstandingOperationLimit;
}
+
+ /**
+ * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
+ * us to create a timer for pretty much anything.
+ *
+ * @param operationName
+ * @return
+ */
+ public Timer getOperationTimer(String operationName){
+ final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName, METRIC_RATE);
+ return metricRegistry.timer(rate);
+ }
+
+ /**
+ * Get the type of the data store to which this ActorContext belongs
+ *
+ * @return
+ */
+ public String getDataStoreType() {
+ return dataStoreType;
+ }
+
+ /**
+ * Set the number of transaction creation permits that are to be allowed
+ *
+ * @param permitsPerSecond
+ */
+ public void setTxCreationLimit(double permitsPerSecond){
+ txRateLimiter.setRate(permitsPerSecond);
+ }
+
+ /**
+ * Get the current transaction creation rate limit
+ * @return
+ */
+ public double getTxCreationLimit(){
+ return txRateLimiter.getRate();
+ }
+
+ /**
+ * Try to acquire a transaction creation permit. Will block if no permits are available.
+ */
+ public void acquireTxCreationPermit(){
+ txRateLimiter.acquire();
+ }
+
+ /**
+ * Return the operation timeout to be used when committing transactions
+ * @return
+ */
+ public Timeout getTransactionCommitOperationTimeout(){
+ return transactionCommitOperationTimeout;
+ }
+
+
}
.shardIsolatedLeaderCheckIntervalInMillis(
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+ .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
.build();
return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
.shardIsolatedLeaderCheckIntervalInMillis(
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+ .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
.build();
return DistributedDataStoreFactory.createInstance("operational",
description "The interval at which the leader of the shard will check if its majority
followers are active and term itself as isolated";
}
+
+ leaf tx-creation-initial-rate-limit {
+ default 100;
+ type non-zero-uint32-type;
+ description "The initial number of transactions per second that are allowed before the data store
+ should begin applying back pressure. This number is only used as an initial guidance,
+ subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit";
+ }
}
// Augments the 'configuration' choice node under modules/module.
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class DistributedDataStoreTest extends AbstractActorTest {
+
+ private SchemaContext schemaContext;
+
+ @Mock
+ private ActorContext actorContext;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ schemaContext = TestModel.createTestContext();
+
+ doReturn(schemaContext).when(actorContext).getSchemaContext();
+ }
+
+ @Test
+ public void testRateLimitingUsedInReadWriteTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newReadWriteTransaction();
+
+ verify(actorContext, times(1)).acquireTxCreationPermit();
+ }
+
+ @Test
+ public void testRateLimitingUsedInWriteOnlyTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newWriteOnlyTransaction();
+
+ verify(actorContext, times(1)).acquireTxCreationPermit();
+ }
+
+
+ @Test
+ public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newReadOnlyTransaction();
+ distributedDataStore.newReadOnlyTransaction();
+ distributedDataStore.newReadOnlyTransaction();
+
+ verify(actorContext, times(0)).acquireTxCreationPermit();
+ }
+
+}
\ No newline at end of file
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
@Mock
private ActorContext actorContext;
+ @Mock
+ private DatastoreContext datastoreContext;
+
+ @Mock
+ private Timer commitTimer;
+
+ @Mock
+ private Timer.Context commitTimerContext;
+
+ @Mock
+ private Snapshot commitSnapshot;
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+ doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+ doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+ doReturn(commitTimerContext).when(commitTimer).time();
+ doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+ doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+ doReturn(10.0).when(actorContext).getTxCreationLimit();
}
private Future<ActorSelection> newCohort() {
}
stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
- isA(requestType));
+ isA(requestType), any(Timeout.class));
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
verify(actorContext, times(nCohorts)).executeOperationAsync(
- any(ActorSelection.class), isA(requestType));
+ any(ActorSelection.class), isA(requestType), any(Timeout.class));
}
private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
try {
propagateExecutionExceptionCause(proxy.commit());
} finally {
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
}
+
}
@Test
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
new CommitTransactionReply(), new CommitTransactionReply());
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
proxy.canCommit().get(5, TimeUnit.SECONDS);
proxy.preCommit().get(5, TimeUnit.SECONDS);
proxy.commit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+
+ // Verify that the creation limit was changed to 0.5 (based on setup)
+ verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
+ }
+
+ @Test
+ public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
+ proxy.commit().get(5, TimeUnit.SECONDS);
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
}
}
package org.opendaylight.controller.cluster.datastore;
import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
ActorContext actorContext = null;
SchemaContext schemaContext = mock(SchemaContext.class);
+ @Mock
+ ActorContext mockActorContext;
+
@Before
public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
actorContext = new MockActorContext(getSystem());
actorContext.setSchemaContext(schemaContext);
+
+ doReturn(schemaContext).when(mockActorContext).getSchemaContext();
}
@SuppressWarnings("resource")
Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
}
+
+ @Test
+ public void testRateLimitingUsedInReadWriteTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newReadWriteTransaction();
+
+ verify(mockActorContext, times(1)).acquireTxCreationPermit();
+ }
+
+ @Test
+ public void testRateLimitingUsedInWriteOnlyTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newWriteOnlyTransaction();
+
+ verify(mockActorContext, times(1)).acquireTxCreationPermit();
+ }
+
+
+ @Test
+ public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newReadOnlyTransaction();
+
+ verify(mockActorContext, times(0)).acquireTxCreationPermit();
+ }
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.time.StopWatch;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
assertEquals(expected, actual);
}
+ @Test
+ public void testRateLimiting(){
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext, "config");
+
+ // Check that the initial value is being picked up from DataStoreContext
+ assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+
+ actorContext.setTxCreationLimit(1.0);
+
+ assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
+
+
+ StopWatch watch = new StopWatch();
+
+ watch.start();
+
+ actorContext.acquireTxCreationPermit();
+ actorContext.acquireTxCreationPermit();
+ actorContext.acquireTxCreationPermit();
+
+ watch.stop();
+
+ assertTrue("did not take as much time as expected", watch.getTime() > 1000);
+ }
}