final AbstractThreePhaseCommitCohort<?> ret;
switch (txContextAdapters.size()) {
case 0:
- TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(txContextFactory.getActorContext());
ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
break;
case 1:
package org.opendaylight.controller.cluster.datastore;
-import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* TransactionRateLimitingCallback computes the new transaction rate limit on the successful completion of a
* transaction
*/
public class TransactionRateLimitingCallback implements OperationCallback{
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionRateLimitingCallback.class);
- private static final String COMMIT = "commit";
-
private final Timer commitTimer;
- private final ActorContext actorContext;
private Timer.Context timerContext;
TransactionRateLimitingCallback(ActorContext actorContext){
- this.actorContext = actorContext;
- commitTimer = actorContext.getOperationTimer(COMMIT);
+ commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT);
}
@Override
public void success() {
Preconditions.checkState(timerContext != null, "Call run before success");
timerContext.stop();
-
- double newRateLimit = calculateNewRateLimit(commitTimer, actorContext.getDatastoreContext());
-
- LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit);
-
- actorContext.setTxCreationLimit(newRateLimit);
}
@Override
// not going to be useful - so we leave it as it is
}
- private static double calculateNewRateLimit(Timer commitTimer, DatastoreContext context) {
- if(commitTimer == null) {
- // This can happen in unit tests.
- return 0;
- }
-
- Snapshot timerSnapshot = commitTimer.getSnapshot();
- double newRateLimit = 0;
-
- long commitTimeoutInSeconds = context.getShardTransactionCommitTimeoutInSeconds();
- long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
-
- // Find the time that it takes for transactions to get executed in every 10th percentile
- // Compute the rate limit for that percentile and sum it up
- for(int i=1;i<=10;i++){
- // Get the amount of time transactions take in the i*10th percentile
- double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D);
-
- if(percentileTimeInNanos > 0) {
- // Figure out the rate limit for the i*10th percentile in nanos
- double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos);
-
- // Add the percentileRateLimit to the total rate limit
- newRateLimit += percentileRateLimit;
- }
- }
-
- // Compute the rate limit per second
- return newRateLimit/(commitTimeoutInSeconds*10);
- }
-
- public static void adjustRateLimitForUnusedTransaction(ActorContext actorContext) {
- // Unused transactions in one data store can artificially limit the rate for other data stores
- // if the first data store's rate is still at a lower initial rate since the front-end creates
- // transactions in each data store up-front even though the client may not actually submit changes.
- // So we may have to adjust the rate for data stores with unused transactions.
-
- // First calculate the current rate for the data store. If it's 0 then there have been no
- // actual transactions committed to the data store.
-
- double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(COMMIT),
- actorContext.getDatastoreContext());
- if(newRateLimit == 0.0) {
- // Since we have no rate data for unused Tx's data store, adjust to the rate from another
- // data store that does have rate data.
- for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) {
- if(datastoreType.equals(actorContext.getDataStoreType())) {
- continue;
- }
-
- newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, COMMIT),
- actorContext.getDatastoreContext());
- if(newRateLimit > 0.0) {
- LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}",
- actorContext.getDataStoreType(), newRateLimit);
-
- actorContext.setTxCreationLimit(newRateLimit);
- break;
- }
- }
- }
- }
}
\ No newline at end of file
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
}
};
public static final String MAILBOX = "bounded-mailbox";
+ public static final String COMMIT = "commit";
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private FiniteDuration operationDuration;
private Timeout operationTimeout;
private final String selfAddressHostPort;
- private RateLimiter txRateLimiter;
+ private TransactionRateLimiter txRateLimiter;
private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
}
private void setCachedProperties() {
- txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+ txRateLimiter = new TransactionRateLimiter(this);
operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
operationTimeout = new Timeout(operationDuration);
return datastoreContext.getDataStoreType();
}
- /**
- * Set the number of transaction creation permits that are to be allowed
- *
- * @param permitsPerSecond
- */
- public void setTxCreationLimit(double permitsPerSecond){
- txRateLimiter.setRate(permitsPerSecond);
- }
-
/**
* Get the current transaction creation rate limit
* @return
*/
public double getTxCreationLimit(){
- return txRateLimiter.getRate();
+ return txRateLimiter.getTxCreationLimit();
}
/**
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TransactionRateLimiter {
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionRateLimiter.class);
+
+ private final ActorContext actorContext;
+ private final long commitTimeoutInSeconds;
+ private final String dataStoreType;
+ private final RateLimiter txRateLimiter;
+ private final AtomicLong acquireCount = new AtomicLong();
+
+ private volatile long pollOnCount = 1;
+
+ public TransactionRateLimiter(ActorContext actorContext){
+ this.actorContext = actorContext;
+ this.commitTimeoutInSeconds = actorContext.getDatastoreContext().getShardTransactionCommitTimeoutInSeconds();
+ this.dataStoreType = actorContext.getDataStoreType();
+ this.txRateLimiter = RateLimiter.create(actorContext.getDatastoreContext().getTransactionCreationInitialRateLimit());
+ }
+
+ public void acquire(){
+ adjustRateLimit();
+ txRateLimiter.acquire();
+ }
+
+ private void adjustRateLimit() {
+ final long count = acquireCount.incrementAndGet();
+ if(count >= pollOnCount) {
+ final Timer commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT);
+ double newRateLimit = calculateNewRateLimit(commitTimer, commitTimeoutInSeconds);
+
+ if (newRateLimit < 1.0) {
+ newRateLimit = getRateLimitFromOtherDataStores();
+ }
+
+ if (newRateLimit >= 1.0) {
+ txRateLimiter.setRate(newRateLimit);
+ pollOnCount = count + ((long) newRateLimit/2);
+ }
+ }
+ }
+
+ public double getTxCreationLimit(){
+ return txRateLimiter.getRate();
+ }
+
+ private double getRateLimitFromOtherDataStores(){
+ // Since we have no rate data for unused Tx's data store, adjust to the rate from another
+ // data store that does have rate data.
+ for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) {
+ if(datastoreType.equals(this.dataStoreType)) {
+ continue;
+ }
+
+ double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, ActorContext.COMMIT),
+ this.commitTimeoutInSeconds);
+ if(newRateLimit > 0.0) {
+ LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}",
+ this.dataStoreType, newRateLimit);
+
+ return newRateLimit;
+ }
+ }
+
+ return -1.0D;
+ }
+
+ private double calculateNewRateLimit(Timer commitTimer, long commitTimeoutInSeconds) {
+ if(commitTimer == null) {
+ // This can happen in unit tests.
+ return 0;
+ }
+
+ Snapshot timerSnapshot = commitTimer.getSnapshot();
+ double newRateLimit = 0;
+
+ long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+ // Find the time that it takes for transactions to get executed in every 10th percentile
+ // Compute the rate limit for that percentile and sum it up
+ for(int i=1;i<=10;i++){
+ // Get the amount of time transactions take in the i*10th percentile
+ double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D);
+
+ if(percentileTimeInNanos > 0) {
+ // Figure out the rate limit for the i*10th percentile in nanos
+ double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos);
+
+ // Add the percentileRateLimit to the total rate limit
+ newRateLimit += percentileRateLimit;
+ }
+ }
+
+ // Compute the rate limit per second
+ return newRateLimit/(commitTimeoutInSeconds*10);
+ }
+
+ @VisibleForTesting
+ long getPollOnCount() {
+ return pollOnCount;
+ }
+
+ @VisibleForTesting
+ void setPollOnCount(long value){
+ pollOnCount = value;
+ }
+
+ @VisibleForTesting
+ void setAcquireCount(long value){
+ acquireCount.set(value);
+ }
+
+}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorPath;
propagateExecutionExceptionCause(proxy.commit());
} finally {
- verify(actorContext, never()).setTxCreationLimit(anyLong());
verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
}
proxy.preCommit().get(5, TimeUnit.SECONDS);
proxy.commit().get(5, TimeUnit.SECONDS);
- verify(actorContext, never()).setTxCreationLimit(anyLong());
}
}
package org.opendaylight.controller.cluster.datastore;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyDouble;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
-import java.util.concurrent.TimeUnit;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Matchers;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
public class TransactionRateLimitingCommitCallbackTest {
-
@Mock
public ActorContext actorContext;
@Test
public void testSuccess(){
-
- for(int i=1;i<11;i++){
- // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
- // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
- doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
- }
-
-
- TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
- commitCallback.run();
- commitCallback.success();
-
- verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(292)));
- }
-
- @Test
- public void testSuccessPercentileValueZero(){
-
- for(int i=1;i<11;i++){
- // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
- // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
- doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
- }
-
- doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1);
-
- TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
- commitCallback.run();
- commitCallback.success();
-
- verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(192)));
- }
-
- @Test
- public void testSuccessOnePercentileValueVeryHigh(){
-
- for(int i=1;i<11;i++){
- // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
- // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
- doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
- }
-
- // ten seconds
- doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(1.0);
-
- TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
- commitCallback.run();
- commitCallback.success();
-
- verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(282)));
- }
-
- @Test
- public void testSuccessWithAllPercentileValueVeryHigh(){
-
- for(int i=1;i<11;i++){
- // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
- // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
- doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(i * 0.1);
- }
-
- TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
- commitCallback.run();
- commitCallback.success();
-
- verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(0)));
- }
-
- @Test
- public void testSuccessWithRealPercentileValues(){
-
- for(int i=1;i<11;i++){
- // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
- // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
- doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1);
- }
-
- doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue( 0.7);
- doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue( 0.9);
- doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue( 1.0);
-
TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
commitCallback.run();
commitCallback.success();
- verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(101)));
+ verify(commitTimerContext).stop();
}
-
@Test
public void testSuccessWithoutRun(){
TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
} catch(IllegalStateException e){
}
-
- verify(actorContext, never()).setTxCreationLimit(anyDouble());
-
+ verify(commitTimerContext, never()).stop();
}
commitCallback.run();
commitCallback.failure();
- verify(actorContext, never()).setTxCreationLimit(anyDouble());
-
- }
-
- @Test
- public void testAdjustRateLimitForUnusedTransaction() {
- doReturn(commitTimer).when(actorContext).getOperationTimer("one", "commit");
- doReturn("one").when(actorContext).getDataStoreType();
-
- Timer commitTimer2 = Mockito.mock(Timer.class);
- Snapshot commitSnapshot2 = Mockito.mock(Snapshot.class);
-
- doReturn(commitSnapshot2).when(commitTimer2).getSnapshot();
-
- doReturn(commitTimer2).when(actorContext).getOperationTimer("two", "commit");
-
- DatastoreContext.newBuilder().dataStoreType("one").build();
- DatastoreContext.newBuilder().dataStoreType("two").build();
-
- doReturn(TimeUnit.MICROSECONDS.toNanos(500) * 1D).when(commitSnapshot).getValue(1 * 0.1);
-
- TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
-
- verify(actorContext, never()).setTxCreationLimit(anyDouble());
-
- Mockito.reset(commitSnapshot);
-
- TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
-
- verify(actorContext, never()).setTxCreationLimit(anyDouble());
-
- System.out.println(""+TimeUnit.SECONDS.toNanos(30)/TimeUnit.MICROSECONDS.toNanos(100));
-
- doReturn(TimeUnit.MICROSECONDS.toNanos(100) * 1D).when(commitSnapshot2).getValue(1 * 0.1);
-
- TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
-
- verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(1000)));
- }
-
- public Matcher<Double> approximately(final double val){
- return new BaseMatcher<Double>() {
- @Override
- public boolean matches(Object o) {
- Double aDouble = (Double) o;
- return aDouble >= val && aDouble <= val+1;
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("> " + val +" < " + (val+1));
- }
- };
+ verify(commitTimerContext, never()).stop();
}
-
}
\ No newline at end of file
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.time.StopWatch;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
assertEquals(expected, actual);
}
- @Test
- public void testRateLimiting(){
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
- transactionCreationInitialRateLimit(155L).build();
-
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext);
-
- // Check that the initial value is being picked up from DataStoreContext
- assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
-
- actorContext.setTxCreationLimit(1.0);
-
- assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
-
-
- StopWatch watch = new StopWatch();
-
- watch.start();
-
- actorContext.acquireTxCreationPermit();
- actorContext.acquireTxCreationPermit();
- actorContext.acquireTxCreationPermit();
-
- watch.stop();
-
- assertTrue("did not take as much time as expected", watch.getTime() > 1000);
- }
@Test
public void testClientDispatcherIsGlobalDispatcher(){
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.time.StopWatch;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+
+public class TransactionRateLimiterTest {
+
+ @Mock
+ public ActorContext actorContext;
+
+ @Mock
+ public DatastoreContext datastoreContext;
+
+ @Mock
+ public Timer commitTimer;
+
+ @Mock
+ private Timer.Context commitTimerContext;
+
+ @Mock
+ private Snapshot commitSnapshot;
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+ doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+ doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+ doReturn(100L).when(datastoreContext).getTransactionCreationInitialRateLimit();
+ doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+ doReturn(commitTimerContext).when(commitTimer).time();
+ doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+ }
+
+ @Test
+ public void testAcquireRateLimitChanged(){
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+ rateLimiter.acquire();
+
+ assertThat(rateLimiter.getTxCreationLimit(), approximately(292));
+
+ assertEquals(147, rateLimiter.getPollOnCount());
+ }
+
+
+ @Test
+ public void testAcquirePercentileValueZero(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1);
+
+ TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+ rateLimiter.acquire();
+
+ assertThat(rateLimiter.getTxCreationLimit(), approximately(192));
+
+ assertEquals(97, rateLimiter.getPollOnCount());
+ }
+
+ @Test
+ public void testAcquireOnePercentileValueVeryHigh(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ // ten seconds
+ doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(1.0);
+
+ TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+ rateLimiter.acquire();
+
+ assertThat(rateLimiter.getTxCreationLimit(), approximately(282));
+
+ assertEquals(142, rateLimiter.getPollOnCount());
+ }
+
+ @Test
+ public void testAcquireWithAllPercentileValueVeryHigh(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+ rateLimiter.acquire();
+
+ // The initial rate limit will be retained here because the calculated rate limit was too small
+ assertThat(rateLimiter.getTxCreationLimit(), approximately(100));
+
+ assertEquals(1, rateLimiter.getPollOnCount());
+ }
+
+ @Test
+ public void testAcquireWithRealPercentileValues(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue(0.7);
+ doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue(0.9);
+ doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue(1.0);
+
+ TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+ rateLimiter.acquire();
+
+ assertThat(rateLimiter.getTxCreationLimit(), approximately(101));
+
+ assertEquals(51, rateLimiter.getPollOnCount());
+ }
+
+
+
+ @Test
+ public void testAcquireGetRateLimitFromOtherDataStores(){
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(0.0D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ Timer operationalCommitTimer = mock(Timer.class);
+ Timer.Context operationalCommitTimerContext = mock(Timer.Context.class);
+ Snapshot operationalCommitSnapshot = mock(Snapshot.class);
+
+ doReturn(operationalCommitTimer).when(actorContext).getOperationTimer("operational", "commit");
+ doReturn(operationalCommitTimerContext).when(operationalCommitTimer).time();
+ doReturn(operationalCommitSnapshot).when(operationalCommitTimer).getSnapshot();
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(operationalCommitSnapshot).getValue(i * 0.1);
+ }
+
+
+ DatastoreContext.getGlobalDatastoreTypes().add("config");
+ DatastoreContext.getGlobalDatastoreTypes().add("operational");
+
+ TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+ rateLimiter.acquire();
+
+ assertThat(rateLimiter.getTxCreationLimit(), approximately(292));
+
+ assertEquals(147, rateLimiter.getPollOnCount());
+ }
+
+ @Test
+ public void testRateLimiting(){
+
+ for(int i=1;i<11;i++){
+ doReturn(TimeUnit.SECONDS.toNanos(1) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+ StopWatch watch = new StopWatch();
+
+ watch.start();
+
+ rateLimiter.acquire();
+ rateLimiter.acquire();
+ rateLimiter.acquire();
+
+ watch.stop();
+
+ assertTrue("did not take as much time as expected rate limit : " + rateLimiter.getTxCreationLimit(),
+ watch.getTime() > 1000);
+ }
+
+ @Test
+ public void testRateLimitNotCalculatedUntilPollCountReached(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue(0.7);
+ doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue(0.9);
+ doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue(1.0);
+
+ TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+ rateLimiter.acquire();
+
+ assertThat(rateLimiter.getTxCreationLimit(), approximately(101));
+
+ assertEquals(51, rateLimiter.getPollOnCount());
+
+ for(int i=0;i<49;i++){
+ rateLimiter.acquire();
+ }
+
+ verify(commitTimer, times(1)).getSnapshot();
+
+ // Acquiring one more time will cause the re-calculation of the rate limit
+ rateLimiter.acquire();
+
+ verify(commitTimer, times(2)).getSnapshot();
+ }
+
+ @Test
+ public void testAcquireNegativeAcquireAndPollOnCount(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue(0.7);
+ doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue(0.9);
+ doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue(1.0);
+
+ TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+ rateLimiter.setAcquireCount(Long.MAX_VALUE-1);
+ rateLimiter.setPollOnCount(Long.MAX_VALUE);
+
+ rateLimiter.acquire();
+
+ assertThat(rateLimiter.getTxCreationLimit(), approximately(101));
+
+ assertEquals(-9223372036854775759L, rateLimiter.getPollOnCount());
+
+ for(int i=0;i<50;i++){
+ rateLimiter.acquire();
+ }
+
+ verify(commitTimer, times(2)).getSnapshot();
+
+ }
+
+ public Matcher<Double> approximately(final double val){
+ return new BaseMatcher<Double>() {
+ @Override
+ public boolean matches(Object o) {
+ Double aDouble = (Double) o;
+ return aDouble >= val && aDouble <= val+1;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("> " + val +" < " + (val+1));
+ }
+ };
+ }
+
+
+}
\ No newline at end of file