BUG 3125 : Set Rate Limit just before acquiring a permit to avoid contention 63/20063/11
authorMoiz Raja <moraja@cisco.com>
Mon, 11 May 2015 22:43:43 +0000 (15:43 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 15 May 2015 19:35:50 +0000 (19:35 +0000)
During perfomance test performance is hampered by the rate limiting code. To
avoid the penalty of possible contention when setting the rate limit in a
separate thread from the one acquiring a rate permit this patch calculates the
new rate limit and sets it on the rate limiter on the same thread which
acquires the rate limit.

Moving the setting of the rate limit into the same thread which does the
acquiring of the permit did not have any discernable effect on performance so
I continued playing around and found that simply calculating the rate limit
can cause issues with performance. This happens because the Coda Hale metrics
Timer that we use to help calculate the rate limit is also synchronized
internally with a read write lock and that causes contention.

To fix the situation with the rate limit calculation I do not calculate the rate
limit everytime someone tries to acquire a permit but only after acquiring about 1/2 of
the last calculated rate limit.

To make it easier to understand/test the rate limiting code I have slightly refactored
the code. All the transaction rate limiting code is now in TransactionRateLimiter.

After making these changes I was able to get the same performance in the dsBenchmark
test as you would get from not using a rate limiter at all.

Change-Id: Ia7639ac3acdc08140fbf5eef03120f857dc44994
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/TransactionRateLimiter.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TransactionRateLimiterTest.java [new file with mode: 0644]

index 82258b46a44b237748342b9d43e8373cb04c0dde..eca930a03d89c3533f59dd441257197357b93bb4 100644 (file)
@@ -244,7 +244,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         final AbstractThreePhaseCommitCohort<?> ret;
         switch (txContextAdapters.size()) {
         case 0:
-            TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(txContextFactory.getActorContext());
             ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
             break;
         case 1:
index 526ce59aabe370246b2742bf068d25b9befcc764..678891446ab9b3da8c87315497bb1f044b681897 100644 (file)
@@ -8,30 +8,20 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
-import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * TransactionRateLimitingCallback computes the new transaction rate limit on the successful completion of a
  * transaction
  */
 public class TransactionRateLimitingCallback implements OperationCallback{
-
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionRateLimitingCallback.class);
-    private static final String COMMIT = "commit";
-
     private final Timer commitTimer;
-    private final ActorContext actorContext;
     private Timer.Context timerContext;
 
     TransactionRateLimitingCallback(ActorContext actorContext){
-        this.actorContext = actorContext;
-        commitTimer = actorContext.getOperationTimer(COMMIT);
+        commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT);
     }
 
     @Override
@@ -43,12 +33,6 @@ public class TransactionRateLimitingCallback implements OperationCallback{
     public void success() {
         Preconditions.checkState(timerContext != null, "Call run before success");
         timerContext.stop();
-
-        double newRateLimit = calculateNewRateLimit(commitTimer, actorContext.getDatastoreContext());
-
-        LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit);
-
-        actorContext.setTxCreationLimit(newRateLimit);
     }
 
     @Override
@@ -58,66 +42,4 @@ public class TransactionRateLimitingCallback implements OperationCallback{
         // not going to be useful - so we leave it as it is
     }
 
-    private static double calculateNewRateLimit(Timer commitTimer, DatastoreContext context) {
-        if(commitTimer == null) {
-            // This can happen in unit tests.
-            return 0;
-        }
-
-        Snapshot timerSnapshot = commitTimer.getSnapshot();
-        double newRateLimit = 0;
-
-        long commitTimeoutInSeconds = context.getShardTransactionCommitTimeoutInSeconds();
-        long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
-
-        // Find the time that it takes for transactions to get executed in every 10th percentile
-        // Compute the rate limit for that percentile and sum it up
-        for(int i=1;i<=10;i++){
-            // Get the amount of time transactions take in the i*10th percentile
-            double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D);
-
-            if(percentileTimeInNanos > 0) {
-                // Figure out the rate limit for the i*10th percentile in nanos
-                double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos);
-
-                // Add the percentileRateLimit to the total rate limit
-                newRateLimit += percentileRateLimit;
-            }
-        }
-
-        // Compute the rate limit per second
-        return newRateLimit/(commitTimeoutInSeconds*10);
-    }
-
-    public static void adjustRateLimitForUnusedTransaction(ActorContext actorContext) {
-        // Unused transactions in one data store can artificially limit the rate for other data stores
-        // if the first data store's rate is still at a lower initial rate since the front-end creates
-        // transactions in each data store up-front even though the client may not actually submit changes.
-        // So we may have to adjust the rate for data stores with unused transactions.
-
-        // First calculate the current rate for the data store. If it's 0 then there have been no
-        // actual transactions committed to the data store.
-
-        double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(COMMIT),
-                actorContext.getDatastoreContext());
-        if(newRateLimit == 0.0) {
-            // Since we have no rate data for unused Tx's data store, adjust to the rate from another
-            // data store that does have rate data.
-            for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) {
-                if(datastoreType.equals(actorContext.getDataStoreType())) {
-                    continue;
-                }
-
-                newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, COMMIT),
-                        actorContext.getDatastoreContext());
-                if(newRateLimit > 0.0) {
-                    LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}",
-                            actorContext.getDataStoreType(), newRateLimit);
-
-                    actorContext.setTxCreationLimit(newRateLimit);
-                    break;
-                }
-            }
-        }
-    }
 }
\ No newline at end of file
index ad05a1ca71001285a8aed1f5afe16b1f511f3e96..5b4f54daf8ea9b56f43e1afa1eb3f38dc9b5c1fd 100644 (file)
@@ -30,7 +30,6 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.RateLimiter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
@@ -90,6 +89,7 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
         }
     };
     public static final String MAILBOX = "bounded-mailbox";
+    public static final String COMMIT = "commit";
 
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
@@ -99,7 +99,7 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
     private FiniteDuration operationDuration;
     private Timeout operationTimeout;
     private final String selfAddressHostPort;
-    private RateLimiter txRateLimiter;
+    private TransactionRateLimiter txRateLimiter;
     private final int transactionOutstandingOperationLimit;
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
@@ -141,7 +141,7 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
     }
 
     private void setCachedProperties() {
-        txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
+        txRateLimiter = new TransactionRateLimiter(this);
 
         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
         operationTimeout = new Timeout(operationDuration);
@@ -519,21 +519,12 @@ public class ActorContext implements RemovalListener<String, Future<PrimaryShard
         return datastoreContext.getDataStoreType();
     }
 
-    /**
-     * Set the number of transaction creation permits that are to be allowed
-     *
-     * @param permitsPerSecond
-     */
-    public void setTxCreationLimit(double permitsPerSecond){
-        txRateLimiter.setRate(permitsPerSecond);
-    }
-
     /**
      * Get the current transaction creation rate limit
      * @return
      */
     public double getTxCreationLimit(){
-        return txRateLimiter.getRate();
+        return txRateLimiter.getTxCreationLimit();
     }
 
     /**
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/TransactionRateLimiter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/TransactionRateLimiter.java
new file mode 100644 (file)
index 0000000..d7cbce7
--- /dev/null
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TransactionRateLimiter {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionRateLimiter.class);
+
+    private final ActorContext actorContext;
+    private final long commitTimeoutInSeconds;
+    private final String dataStoreType;
+    private final RateLimiter txRateLimiter;
+    private final AtomicLong acquireCount = new AtomicLong();
+
+    private volatile long pollOnCount = 1;
+
+    public TransactionRateLimiter(ActorContext actorContext){
+        this.actorContext = actorContext;
+        this.commitTimeoutInSeconds = actorContext.getDatastoreContext().getShardTransactionCommitTimeoutInSeconds();
+        this.dataStoreType = actorContext.getDataStoreType();
+        this.txRateLimiter = RateLimiter.create(actorContext.getDatastoreContext().getTransactionCreationInitialRateLimit());
+    }
+
+    public void acquire(){
+        adjustRateLimit();
+        txRateLimiter.acquire();
+    }
+
+    private void adjustRateLimit() {
+        final long count = acquireCount.incrementAndGet();
+        if(count >= pollOnCount) {
+            final Timer commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT);
+            double newRateLimit = calculateNewRateLimit(commitTimer, commitTimeoutInSeconds);
+
+            if (newRateLimit < 1.0) {
+                newRateLimit = getRateLimitFromOtherDataStores();
+            }
+
+            if (newRateLimit >= 1.0) {
+                txRateLimiter.setRate(newRateLimit);
+                pollOnCount = count + ((long) newRateLimit/2);
+            }
+        }
+    }
+
+    public double getTxCreationLimit(){
+        return txRateLimiter.getRate();
+    }
+
+    private double getRateLimitFromOtherDataStores(){
+        // Since we have no rate data for unused Tx's data store, adjust to the rate from another
+        // data store that does have rate data.
+        for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) {
+            if(datastoreType.equals(this.dataStoreType)) {
+                continue;
+            }
+
+            double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, ActorContext.COMMIT),
+                    this.commitTimeoutInSeconds);
+            if(newRateLimit > 0.0) {
+                LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}",
+                        this.dataStoreType, newRateLimit);
+
+                return newRateLimit;
+            }
+        }
+
+        return -1.0D;
+    }
+
+    private double calculateNewRateLimit(Timer commitTimer, long commitTimeoutInSeconds) {
+        if(commitTimer == null) {
+            // This can happen in unit tests.
+            return 0;
+        }
+
+        Snapshot timerSnapshot = commitTimer.getSnapshot();
+        double newRateLimit = 0;
+
+        long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+        // Find the time that it takes for transactions to get executed in every 10th percentile
+        // Compute the rate limit for that percentile and sum it up
+        for(int i=1;i<=10;i++){
+            // Get the amount of time transactions take in the i*10th percentile
+            double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D);
+
+            if(percentileTimeInNanos > 0) {
+                // Figure out the rate limit for the i*10th percentile in nanos
+                double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos);
+
+                // Add the percentileRateLimit to the total rate limit
+                newRateLimit += percentileRateLimit;
+            }
+        }
+
+        // Compute the rate limit per second
+        return newRateLimit/(commitTimeoutInSeconds*10);
+    }
+
+    @VisibleForTesting
+    long getPollOnCount() {
+        return pollOnCount;
+    }
+
+    @VisibleForTesting
+    void setPollOnCount(long value){
+        pollOnCount = value;
+    }
+
+    @VisibleForTesting
+    void setAcquireCount(long value){
+        acquireCount.set(value);
+    }
+
+}
index d595adc8bb80c0a98701526b2240b757c1f2d6c6..cb3bd60fbf15fb3a4a6018cc9b2a8f4bcca4546b 100644 (file)
@@ -3,10 +3,8 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorPath;
@@ -312,7 +310,6 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
             propagateExecutionExceptionCause(proxy.commit());
         } finally {
 
-            verify(actorContext, never()).setTxCreationLimit(anyLong());
             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
         }
 
@@ -354,6 +351,5 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         proxy.preCommit().get(5, TimeUnit.SECONDS);
         proxy.commit().get(5, TimeUnit.SECONDS);
 
-        verify(actorContext, never()).setTxCreationLimit(anyLong());
     }
 }
index 69a023ad3158ebbbe0e1fd04c924d0358851a0d9..de3a56d0782dafb6ee377a5284b4e35ecbb77811 100644 (file)
@@ -9,26 +9,18 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyDouble;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
-import java.util.concurrent.TimeUnit;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Matchers;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 
 public class TransactionRateLimitingCommitCallbackTest {
-
     @Mock
     public ActorContext actorContext;
 
@@ -56,95 +48,13 @@ public class TransactionRateLimitingCommitCallbackTest {
 
     @Test
     public void testSuccess(){
-
-        for(int i=1;i<11;i++){
-            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
-            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
-            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
-        }
-
-
-        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
-        commitCallback.run();
-        commitCallback.success();
-
-        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(292)));
-    }
-
-    @Test
-    public void testSuccessPercentileValueZero(){
-
-        for(int i=1;i<11;i++){
-            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
-            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
-            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
-        }
-
-        doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1);
-
-        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
-        commitCallback.run();
-        commitCallback.success();
-
-        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(192)));
-    }
-
-    @Test
-    public void testSuccessOnePercentileValueVeryHigh(){
-
-        for(int i=1;i<11;i++){
-            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
-            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
-            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
-        }
-
-        // ten seconds
-        doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(1.0);
-
-        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
-        commitCallback.run();
-        commitCallback.success();
-
-        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(282)));
-    }
-
-    @Test
-    public void testSuccessWithAllPercentileValueVeryHigh(){
-
-        for(int i=1;i<11;i++){
-            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
-            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
-            doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(i * 0.1);
-        }
-
-        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
-        commitCallback.run();
-        commitCallback.success();
-
-        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(0)));
-    }
-
-    @Test
-    public void testSuccessWithRealPercentileValues(){
-
-        for(int i=1;i<11;i++){
-            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
-            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
-            doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1);
-        }
-
-        doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue( 0.7);
-        doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue( 0.9);
-        doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue( 1.0);
-
         TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
         commitCallback.run();
         commitCallback.success();
 
-        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(101)));
+        verify(commitTimerContext).stop();
     }
 
-
     @Test
     public void testSuccessWithoutRun(){
         TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
@@ -155,9 +65,7 @@ public class TransactionRateLimitingCommitCallbackTest {
         } catch(IllegalStateException e){
 
         }
-
-        verify(actorContext, never()).setTxCreationLimit(anyDouble());
-
+        verify(commitTimerContext, never()).stop();
     }
 
 
@@ -167,60 +75,7 @@ public class TransactionRateLimitingCommitCallbackTest {
         commitCallback.run();
         commitCallback.failure();
 
-        verify(actorContext, never()).setTxCreationLimit(anyDouble());
-
-    }
-
-    @Test
-    public void testAdjustRateLimitForUnusedTransaction() {
-        doReturn(commitTimer).when(actorContext).getOperationTimer("one", "commit");
-        doReturn("one").when(actorContext).getDataStoreType();
-
-        Timer commitTimer2 = Mockito.mock(Timer.class);
-        Snapshot commitSnapshot2 = Mockito.mock(Snapshot.class);
-
-        doReturn(commitSnapshot2).when(commitTimer2).getSnapshot();
-
-        doReturn(commitTimer2).when(actorContext).getOperationTimer("two", "commit");
-
-        DatastoreContext.newBuilder().dataStoreType("one").build();
-        DatastoreContext.newBuilder().dataStoreType("two").build();
-
-        doReturn(TimeUnit.MICROSECONDS.toNanos(500) * 1D).when(commitSnapshot).getValue(1 * 0.1);
-
-        TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
-
-        verify(actorContext, never()).setTxCreationLimit(anyDouble());
-
-        Mockito.reset(commitSnapshot);
-
-        TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
-
-        verify(actorContext, never()).setTxCreationLimit(anyDouble());
-
-        System.out.println(""+TimeUnit.SECONDS.toNanos(30)/TimeUnit.MICROSECONDS.toNanos(100));
-
-        doReturn(TimeUnit.MICROSECONDS.toNanos(100) * 1D).when(commitSnapshot2).getValue(1 * 0.1);
-
-        TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
-
-        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(1000)));
-    }
-
-    public Matcher<Double> approximately(final double val){
-        return new BaseMatcher<Double>() {
-            @Override
-            public boolean matches(Object o) {
-                Double aDouble = (Double) o;
-                return aDouble >= val && aDouble <= val+1;
-            }
-
-            @Override
-            public void describeTo(Description description) {
-                description.appendText("> " + val +" < " + (val+1));
-            }
-        };
+        verify(commitTimerContext, never()).stop();
     }
 
-
 }
\ No newline at end of file
index 031463b2b958efc9803830a56ee4436eacd32674..377c05bf8c2eaed0bd6ac6b4278b780d20c95499 100644 (file)
@@ -28,7 +28,6 @@ import com.typesafe.config.ConfigFactory;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.time.StopWatch;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -327,35 +326,6 @@ public class ActorContextTest extends AbstractActorTest{
         assertEquals(expected, actual);
     }
 
-    @Test
-    public void testRateLimiting(){
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
-                transactionCreationInitialRateLimit(155L).build();
-
-        ActorContext actorContext =
-                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), dataStoreContext);
-
-        // Check that the initial value is being picked up from DataStoreContext
-        assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
-
-        actorContext.setTxCreationLimit(1.0);
-
-        assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
-
-
-        StopWatch watch = new StopWatch();
-
-        watch.start();
-
-        actorContext.acquireTxCreationPermit();
-        actorContext.acquireTxCreationPermit();
-        actorContext.acquireTxCreationPermit();
-
-        watch.stop();
-
-        assertTrue("did not take as much time as expected", watch.getTime() > 1000);
-    }
 
     @Test
     public void testClientDispatcherIsGlobalDispatcher(){
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TransactionRateLimiterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TransactionRateLimiterTest.java
new file mode 100644 (file)
index 0000000..2c89716
--- /dev/null
@@ -0,0 +1,299 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.time.StopWatch;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+
+public class TransactionRateLimiterTest {
+
+    @Mock
+    public ActorContext actorContext;
+
+    @Mock
+    public DatastoreContext datastoreContext;
+
+    @Mock
+    public Timer commitTimer;
+
+    @Mock
+    private Timer.Context commitTimerContext;
+
+    @Mock
+    private Snapshot commitSnapshot;
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+        doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+        doReturn(100L).when(datastoreContext).getTransactionCreationInitialRateLimit();
+        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+        doReturn(commitTimerContext).when(commitTimer).time();
+        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+    }
+
+    @Test
+    public void testAcquireRateLimitChanged(){
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+        rateLimiter.acquire();
+
+        assertThat(rateLimiter.getTxCreationLimit(), approximately(292));
+
+        assertEquals(147, rateLimiter.getPollOnCount());
+    }
+
+
+    @Test
+    public void testAcquirePercentileValueZero(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1);
+
+        TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+        rateLimiter.acquire();
+
+        assertThat(rateLimiter.getTxCreationLimit(), approximately(192));
+
+        assertEquals(97, rateLimiter.getPollOnCount());
+    }
+
+    @Test
+    public void testAcquireOnePercentileValueVeryHigh(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        // ten seconds
+        doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(1.0);
+
+        TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+        rateLimiter.acquire();
+
+        assertThat(rateLimiter.getTxCreationLimit(), approximately(282));
+
+        assertEquals(142, rateLimiter.getPollOnCount());
+    }
+
+    @Test
+    public void testAcquireWithAllPercentileValueVeryHigh(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+        rateLimiter.acquire();
+
+        // The initial rate limit will be retained here because the calculated rate limit was too small
+        assertThat(rateLimiter.getTxCreationLimit(), approximately(100));
+
+        assertEquals(1, rateLimiter.getPollOnCount());
+    }
+
+    @Test
+    public void testAcquireWithRealPercentileValues(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue(0.7);
+        doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue(0.9);
+        doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue(1.0);
+
+        TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+        rateLimiter.acquire();
+
+        assertThat(rateLimiter.getTxCreationLimit(), approximately(101));
+
+        assertEquals(51, rateLimiter.getPollOnCount());
+    }
+
+
+
+    @Test
+    public void testAcquireGetRateLimitFromOtherDataStores(){
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(0.0D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        Timer operationalCommitTimer = mock(Timer.class);
+        Timer.Context operationalCommitTimerContext = mock(Timer.Context.class);
+        Snapshot operationalCommitSnapshot = mock(Snapshot.class);
+
+        doReturn(operationalCommitTimer).when(actorContext).getOperationTimer("operational", "commit");
+        doReturn(operationalCommitTimerContext).when(operationalCommitTimer).time();
+        doReturn(operationalCommitSnapshot).when(operationalCommitTimer).getSnapshot();
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(operationalCommitSnapshot).getValue(i * 0.1);
+        }
+
+
+        DatastoreContext.getGlobalDatastoreTypes().add("config");
+        DatastoreContext.getGlobalDatastoreTypes().add("operational");
+
+        TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+        rateLimiter.acquire();
+
+        assertThat(rateLimiter.getTxCreationLimit(), approximately(292));
+
+        assertEquals(147, rateLimiter.getPollOnCount());
+    }
+
+    @Test
+    public void testRateLimiting(){
+
+        for(int i=1;i<11;i++){
+            doReturn(TimeUnit.SECONDS.toNanos(1) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+        StopWatch watch = new StopWatch();
+
+        watch.start();
+
+        rateLimiter.acquire();
+        rateLimiter.acquire();
+        rateLimiter.acquire();
+
+        watch.stop();
+
+        assertTrue("did not take as much time as expected rate limit : " + rateLimiter.getTxCreationLimit(),
+                watch.getTime() > 1000);
+    }
+
+    @Test
+    public void testRateLimitNotCalculatedUntilPollCountReached(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue(0.7);
+        doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue(0.9);
+        doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue(1.0);
+
+        TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+
+        rateLimiter.acquire();
+
+        assertThat(rateLimiter.getTxCreationLimit(), approximately(101));
+
+        assertEquals(51, rateLimiter.getPollOnCount());
+
+        for(int i=0;i<49;i++){
+            rateLimiter.acquire();
+        }
+
+        verify(commitTimer, times(1)).getSnapshot();
+
+        // Acquiring one more time will cause the re-calculation of the rate limit
+        rateLimiter.acquire();
+
+        verify(commitTimer, times(2)).getSnapshot();
+    }
+
+    @Test
+    public void testAcquireNegativeAcquireAndPollOnCount(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue(0.7);
+        doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue(0.9);
+        doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue(1.0);
+
+        TransactionRateLimiter rateLimiter = new TransactionRateLimiter(actorContext);
+        rateLimiter.setAcquireCount(Long.MAX_VALUE-1);
+        rateLimiter.setPollOnCount(Long.MAX_VALUE);
+
+        rateLimiter.acquire();
+
+        assertThat(rateLimiter.getTxCreationLimit(), approximately(101));
+
+        assertEquals(-9223372036854775759L, rateLimiter.getPollOnCount());
+
+        for(int i=0;i<50;i++){
+            rateLimiter.acquire();
+        }
+
+        verify(commitTimer, times(2)).getSnapshot();
+
+    }
+
+    public Matcher<Double> approximately(final double val){
+        return new BaseMatcher<Double>() {
+            @Override
+            public boolean matches(Object o) {
+                Double aDouble = (Double) o;
+                return aDouble >= val && aDouble <= val+1;
+            }
+
+            @Override
+            public void describeTo(Description description) {
+                description.appendText("> " + val +" < " + (val+1));
+            }
+        };
+    }
+
+
+}
\ No newline at end of file