From: Moiz Raja Date: Mon, 11 May 2015 22:43:43 +0000 (-0700) Subject: BUG 3125 : Set Rate Limit just before acquiring a permit to avoid contention X-Git-Tag: release/lithium~151 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=89e2dda49ea99a79d510a9cc72ebcb09a9d879de BUG 3125 : Set Rate Limit just before acquiring a permit to avoid contention During perfomance test performance is hampered by the rate limiting code. To avoid the penalty of possible contention when setting the rate limit in a separate thread from the one acquiring a rate permit this patch calculates the new rate limit and sets it on the rate limiter on the same thread which acquires the rate limit. Moving the setting of the rate limit into the same thread which does the acquiring of the permit did not have any discernable effect on performance so I continued playing around and found that simply calculating the rate limit can cause issues with performance. This happens because the Coda Hale metrics Timer that we use to help calculate the rate limit is also synchronized internally with a read write lock and that causes contention. To fix the situation with the rate limit calculation I do not calculate the rate limit everytime someone tries to acquire a permit but only after acquiring about 1/2 of the last calculated rate limit. To make it easier to understand/test the rate limiting code I have slightly refactored the code. All the transaction rate limiting code is now in TransactionRateLimiter. After making these changes I was able to get the same performance in the dsBenchmark test as you would get from not using a rate limiter at all. Change-Id: Ia7639ac3acdc08140fbf5eef03120f857dc44994 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 82258b46a4..eca930a03d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -244,7 +244,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction ret; switch (txContextAdapters.size()) { case 0: - TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(txContextFactory.getActorContext()); ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; break; case 1: diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java index 526ce59aab..678891446a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java @@ -8,30 +8,20 @@ package org.opendaylight.controller.cluster.datastore; -import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; -import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * TransactionRateLimitingCallback computes the new transaction rate limit on the successful completion of a * transaction */ public class TransactionRateLimitingCallback implements OperationCallback{ - - private static final Logger LOG = LoggerFactory.getLogger(TransactionRateLimitingCallback.class); - private static final String COMMIT = "commit"; - private final Timer commitTimer; - private final ActorContext actorContext; private Timer.Context timerContext; TransactionRateLimitingCallback(ActorContext actorContext){ - this.actorContext = actorContext; - commitTimer = actorContext.getOperationTimer(COMMIT); + commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT); } @Override @@ -43,12 +33,6 @@ public class TransactionRateLimitingCallback implements OperationCallback{ public void success() { Preconditions.checkState(timerContext != null, "Call run before success"); timerContext.stop(); - - double newRateLimit = calculateNewRateLimit(commitTimer, actorContext.getDatastoreContext()); - - LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit); - - actorContext.setTxCreationLimit(newRateLimit); } @Override @@ -58,66 +42,4 @@ public class TransactionRateLimitingCallback implements OperationCallback{ // not going to be useful - so we leave it as it is } - private static double calculateNewRateLimit(Timer commitTimer, DatastoreContext context) { - if(commitTimer == null) { - // This can happen in unit tests. - return 0; - } - - Snapshot timerSnapshot = commitTimer.getSnapshot(); - double newRateLimit = 0; - - long commitTimeoutInSeconds = context.getShardTransactionCommitTimeoutInSeconds(); - long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds); - - // Find the time that it takes for transactions to get executed in every 10th percentile - // Compute the rate limit for that percentile and sum it up - for(int i=1;i<=10;i++){ - // Get the amount of time transactions take in the i*10th percentile - double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D); - - if(percentileTimeInNanos > 0) { - // Figure out the rate limit for the i*10th percentile in nanos - double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos); - - // Add the percentileRateLimit to the total rate limit - newRateLimit += percentileRateLimit; - } - } - - // Compute the rate limit per second - return newRateLimit/(commitTimeoutInSeconds*10); - } - - public static void adjustRateLimitForUnusedTransaction(ActorContext actorContext) { - // Unused transactions in one data store can artificially limit the rate for other data stores - // if the first data store's rate is still at a lower initial rate since the front-end creates - // transactions in each data store up-front even though the client may not actually submit changes. - // So we may have to adjust the rate for data stores with unused transactions. - - // First calculate the current rate for the data store. If it's 0 then there have been no - // actual transactions committed to the data store. - - double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(COMMIT), - actorContext.getDatastoreContext()); - if(newRateLimit == 0.0) { - // Since we have no rate data for unused Tx's data store, adjust to the rate from another - // data store that does have rate data. - for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) { - if(datastoreType.equals(actorContext.getDataStoreType())) { - continue; - } - - newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, COMMIT), - actorContext.getDatastoreContext()); - if(newRateLimit > 0.0) { - LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}", - actorContext.getDataStoreType(), newRateLimit); - - actorContext.setTxCreationLimit(newRateLimit); - break; - } - } - } - } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index ad05a1ca71..5b4f54daf8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -30,7 +30,6 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; -import com.google.common.util.concurrent.RateLimiter; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.TimeUnit; @@ -90,6 +89,7 @@ public class ActorContext implements RemovalListener= pollOnCount) { + final Timer commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT); + double newRateLimit = calculateNewRateLimit(commitTimer, commitTimeoutInSeconds); + + if (newRateLimit < 1.0) { + newRateLimit = getRateLimitFromOtherDataStores(); + } + + if (newRateLimit >= 1.0) { + txRateLimiter.setRate(newRateLimit); + pollOnCount = count + ((long) newRateLimit/2); + } + } + } + + public double getTxCreationLimit(){ + return txRateLimiter.getRate(); + } + + private double getRateLimitFromOtherDataStores(){ + // Since we have no rate data for unused Tx's data store, adjust to the rate from another + // data store that does have rate data. + for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) { + if(datastoreType.equals(this.dataStoreType)) { + continue; + } + + double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, ActorContext.COMMIT), + this.commitTimeoutInSeconds); + if(newRateLimit > 0.0) { + LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}", + this.dataStoreType, newRateLimit); + + return newRateLimit; + } + } + + return -1.0D; + } + + private double calculateNewRateLimit(Timer commitTimer, long commitTimeoutInSeconds) { + if(commitTimer == null) { + // This can happen in unit tests. + return 0; + } + + Snapshot timerSnapshot = commitTimer.getSnapshot(); + double newRateLimit = 0; + + long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds); + + // Find the time that it takes for transactions to get executed in every 10th percentile + // Compute the rate limit for that percentile and sum it up + for(int i=1;i<=10;i++){ + // Get the amount of time transactions take in the i*10th percentile + double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D); + + if(percentileTimeInNanos > 0) { + // Figure out the rate limit for the i*10th percentile in nanos + double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos); + + // Add the percentileRateLimit to the total rate limit + newRateLimit += percentileRateLimit; + } + } + + // Compute the rate limit per second + return newRateLimit/(commitTimeoutInSeconds*10); + } + + @VisibleForTesting + long getPollOnCount() { + return pollOnCount; + } + + @VisibleForTesting + void setPollOnCount(long value){ + pollOnCount = value; + } + + @VisibleForTesting + void setAcquireCount(long value){ + acquireCount.set(value); + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index d595adc8bb..cb3bd60fbf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -3,10 +3,8 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import akka.actor.ActorPath; @@ -312,7 +310,6 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { propagateExecutionExceptionCause(proxy.commit()); } finally { - verify(actorContext, never()).setTxCreationLimit(anyLong()); verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS); } @@ -354,6 +351,5 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { proxy.preCommit().get(5, TimeUnit.SECONDS); proxy.commit().get(5, TimeUnit.SECONDS); - verify(actorContext, never()).setTxCreationLimit(anyLong()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java index 69a023ad31..de3a56d078 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java @@ -9,26 +9,18 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyDouble; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; -import java.util.concurrent.TimeUnit; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.hamcrest.Matcher; import org.junit.Before; import org.junit.Test; -import org.mockito.Matchers; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; public class TransactionRateLimitingCommitCallbackTest { - @Mock public ActorContext actorContext; @@ -56,95 +48,13 @@ public class TransactionRateLimitingCommitCallbackTest { @Test public void testSuccess(){ - - for(int i=1;i<11;i++){ - // Keep on increasing the amount of time it takes to complete transaction for each tenth of a - // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. - doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); - } - - - TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext); - commitCallback.run(); - commitCallback.success(); - - verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(292))); - } - - @Test - public void testSuccessPercentileValueZero(){ - - for(int i=1;i<11;i++){ - // Keep on increasing the amount of time it takes to complete transaction for each tenth of a - // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. - doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); - } - - doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1); - - TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext); - commitCallback.run(); - commitCallback.success(); - - verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(192))); - } - - @Test - public void testSuccessOnePercentileValueVeryHigh(){ - - for(int i=1;i<11;i++){ - // Keep on increasing the amount of time it takes to complete transaction for each tenth of a - // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. - doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); - } - - // ten seconds - doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(1.0); - - TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext); - commitCallback.run(); - commitCallback.success(); - - verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(282))); - } - - @Test - public void testSuccessWithAllPercentileValueVeryHigh(){ - - for(int i=1;i<11;i++){ - // Keep on increasing the amount of time it takes to complete transaction for each tenth of a - // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. - doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(i * 0.1); - } - - TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext); - commitCallback.run(); - commitCallback.success(); - - verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(0))); - } - - @Test - public void testSuccessWithRealPercentileValues(){ - - for(int i=1;i<11;i++){ - // Keep on increasing the amount of time it takes to complete transaction for each tenth of a - // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. - doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1); - } - - doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue( 0.7); - doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue( 0.9); - doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue( 1.0); - TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext); commitCallback.run(); commitCallback.success(); - verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(101))); + verify(commitTimerContext).stop(); } - @Test public void testSuccessWithoutRun(){ TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext); @@ -155,9 +65,7 @@ public class TransactionRateLimitingCommitCallbackTest { } catch(IllegalStateException e){ } - - verify(actorContext, never()).setTxCreationLimit(anyDouble()); - + verify(commitTimerContext, never()).stop(); } @@ -167,60 +75,7 @@ public class TransactionRateLimitingCommitCallbackTest { commitCallback.run(); commitCallback.failure(); - verify(actorContext, never()).setTxCreationLimit(anyDouble()); - - } - - @Test - public void testAdjustRateLimitForUnusedTransaction() { - doReturn(commitTimer).when(actorContext).getOperationTimer("one", "commit"); - doReturn("one").when(actorContext).getDataStoreType(); - - Timer commitTimer2 = Mockito.mock(Timer.class); - Snapshot commitSnapshot2 = Mockito.mock(Snapshot.class); - - doReturn(commitSnapshot2).when(commitTimer2).getSnapshot(); - - doReturn(commitTimer2).when(actorContext).getOperationTimer("two", "commit"); - - DatastoreContext.newBuilder().dataStoreType("one").build(); - DatastoreContext.newBuilder().dataStoreType("two").build(); - - doReturn(TimeUnit.MICROSECONDS.toNanos(500) * 1D).when(commitSnapshot).getValue(1 * 0.1); - - TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext); - - verify(actorContext, never()).setTxCreationLimit(anyDouble()); - - Mockito.reset(commitSnapshot); - - TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext); - - verify(actorContext, never()).setTxCreationLimit(anyDouble()); - - System.out.println(""+TimeUnit.SECONDS.toNanos(30)/TimeUnit.MICROSECONDS.toNanos(100)); - - doReturn(TimeUnit.MICROSECONDS.toNanos(100) * 1D).when(commitSnapshot2).getValue(1 * 0.1); - - TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext); - - verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(1000))); - } - - public Matcher approximately(final double val){ - return new BaseMatcher() { - @Override - public boolean matches(Object o) { - Double aDouble = (Double) o; - return aDouble >= val && aDouble <= val+1; - } - - @Override - public void describeTo(Description description) { - description.appendText("> " + val +" < " + (val+1)); - } - }; + verify(commitTimerContext, never()).stop(); } - } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 031463b2b9..377c05bf8c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -28,7 +28,6 @@ import com.typesafe.config.ConfigFactory; import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang.time.StopWatch; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -327,35 +326,6 @@ public class ActorContextTest extends AbstractActorTest{ assertEquals(expected, actual); } - @Test - public void testRateLimiting(){ - DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). - transactionCreationInitialRateLimit(155L).build(); - - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext); - - // Check that the initial value is being picked up from DataStoreContext - assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); - - actorContext.setTxCreationLimit(1.0); - - assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15); - - - StopWatch watch = new StopWatch(); - - watch.start(); - - actorContext.acquireTxCreationPermit(); - actorContext.acquireTxCreationPermit(); - actorContext.acquireTxCreationPermit(); - - watch.stop(); - - assertTrue("did not take as much time as expected", watch.getTime() > 1000); - } @Test public void testClientDispatcherIsGlobalDispatcher(){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TransactionRateLimiterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TransactionRateLimiterTest.java new file mode 100644 index 0000000000..2c89716487 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TransactionRateLimiterTest.java @@ -0,0 +1,299 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.time.StopWatch; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; + +public class TransactionRateLimiterTest { + + @Mock + public ActorContext actorContext; + + @Mock + public DatastoreContext datastoreContext; + + @Mock + public Timer commitTimer; + + @Mock + private Timer.Context commitTimerContext; + + @Mock + private Snapshot commitSnapshot; + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + doReturn(datastoreContext).when(actorContext).getDatastoreContext(); + doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds(); + doReturn(100L).when(datastoreContext).getTransactionCreationInitialRateLimit(); + doReturn(commitTimer).when(actorContext).getOperationTimer("commit"); + doReturn(commitTimerContext).when(commitTimer).time(); + doReturn(commitSnapshot).when(commitTimer).getSnapshot(); + } + + @Test + public void testAcquireRateLimitChanged(){ + for(int i=1;i<11;i++){ + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); + } + + TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext); + + rateLimiter.acquire(); + + assertThat(rateLimiter.getTxCreationLimit(), approximately(292)); + + assertEquals(147, rateLimiter.getPollOnCount()); + } + + + @Test + public void testAcquirePercentileValueZero(){ + + for(int i=1;i<11;i++){ + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); + } + + doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1); + + TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext); + + rateLimiter.acquire(); + + assertThat(rateLimiter.getTxCreationLimit(), approximately(192)); + + assertEquals(97, rateLimiter.getPollOnCount()); + } + + @Test + public void testAcquireOnePercentileValueVeryHigh(){ + + for(int i=1;i<11;i++){ + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); + } + + // ten seconds + doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(1.0); + + TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext); + + rateLimiter.acquire(); + + assertThat(rateLimiter.getTxCreationLimit(), approximately(282)); + + assertEquals(142, rateLimiter.getPollOnCount()); + } + + @Test + public void testAcquireWithAllPercentileValueVeryHigh(){ + + for(int i=1;i<11;i++){ + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(i * 0.1); + } + + TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext); + + rateLimiter.acquire(); + + // The initial rate limit will be retained here because the calculated rate limit was too small + assertThat(rateLimiter.getTxCreationLimit(), approximately(100)); + + assertEquals(1, rateLimiter.getPollOnCount()); + } + + @Test + public void testAcquireWithRealPercentileValues(){ + + for(int i=1;i<11;i++){ + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1); + } + + doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue(0.7); + doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue(0.9); + doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue(1.0); + + TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext); + + rateLimiter.acquire(); + + assertThat(rateLimiter.getTxCreationLimit(), approximately(101)); + + assertEquals(51, rateLimiter.getPollOnCount()); + } + + + + @Test + public void testAcquireGetRateLimitFromOtherDataStores(){ + for(int i=1;i<11;i++){ + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(0.0D).when(commitSnapshot).getValue(i * 0.1); + } + + Timer operationalCommitTimer = mock(Timer.class); + Timer.Context operationalCommitTimerContext = mock(Timer.Context.class); + Snapshot operationalCommitSnapshot = mock(Snapshot.class); + + doReturn(operationalCommitTimer).when(actorContext).getOperationTimer("operational", "commit"); + doReturn(operationalCommitTimerContext).when(operationalCommitTimer).time(); + doReturn(operationalCommitSnapshot).when(operationalCommitTimer).getSnapshot(); + + for(int i=1;i<11;i++){ + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(operationalCommitSnapshot).getValue(i * 0.1); + } + + + DatastoreContext.getGlobalDatastoreTypes().add("config"); + DatastoreContext.getGlobalDatastoreTypes().add("operational"); + + TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext); + + rateLimiter.acquire(); + + assertThat(rateLimiter.getTxCreationLimit(), approximately(292)); + + assertEquals(147, rateLimiter.getPollOnCount()); + } + + @Test + public void testRateLimiting(){ + + for(int i=1;i<11;i++){ + doReturn(TimeUnit.SECONDS.toNanos(1) * 1D).when(commitSnapshot).getValue(i * 0.1); + } + + TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext); + + StopWatch watch = new StopWatch(); + + watch.start(); + + rateLimiter.acquire(); + rateLimiter.acquire(); + rateLimiter.acquire(); + + watch.stop(); + + assertTrue("did not take as much time as expected rate limit : " + rateLimiter.getTxCreationLimit(), + watch.getTime() > 1000); + } + + @Test + public void testRateLimitNotCalculatedUntilPollCountReached(){ + + for(int i=1;i<11;i++){ + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1); + } + + doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue(0.7); + doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue(0.9); + doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue(1.0); + + TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext); + + rateLimiter.acquire(); + + assertThat(rateLimiter.getTxCreationLimit(), approximately(101)); + + assertEquals(51, rateLimiter.getPollOnCount()); + + for(int i=0;i<49;i++){ + rateLimiter.acquire(); + } + + verify(commitTimer, times(1)).getSnapshot(); + + // Acquiring one more time will cause the re-calculation of the rate limit + rateLimiter.acquire(); + + verify(commitTimer, times(2)).getSnapshot(); + } + + @Test + public void testAcquireNegativeAcquireAndPollOnCount(){ + + for(int i=1;i<11;i++){ + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1); + } + + doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue(0.7); + doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue(0.9); + doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue(1.0); + + TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext); + rateLimiter.setAcquireCount(Long.MAX_VALUE-1); + rateLimiter.setPollOnCount(Long.MAX_VALUE); + + rateLimiter.acquire(); + + assertThat(rateLimiter.getTxCreationLimit(), approximately(101)); + + assertEquals(-9223372036854775759L, rateLimiter.getPollOnCount()); + + for(int i=0;i<50;i++){ + rateLimiter.acquire(); + } + + verify(commitTimer, times(2)).getSnapshot(); + + } + + public Matcher approximately(final double val){ + return new BaseMatcher() { + @Override + public boolean matches(Object o) { + Double aDouble = (Double) o; + return aDouble >= val && aDouble <= val+1; + } + + @Override + public void describeTo(Description description) { + description.appendText("> " + val +" < " + (val+1)); + } + }; + } + + +} \ No newline at end of file