BUG 2762 : Compute rate limit based on a more lenient algorithm 98/15798/3
authorMoiz Raja <moraja@cisco.com>
Thu, 26 Feb 2015 20:50:02 +0000 (12:50 -0800)
committerMoiz Raja <moraja@cisco.com>
Thu, 26 Feb 2015 23:57:46 +0000 (15:57 -0800)
Calculate the rate limit by looking at each percentile tenth instead of
a single value. This provides a more accurate limit.

Change-Id: Ic71476613ab966c716c203768f3ff395cb251ed0
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java
new file mode 100644 (file)
index 0000000..b944536
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+public interface OperationCallback {
+    void run();
+    void success();
+    void failure();
+}
index 4445b14e2edc1b48b5c92012e8a659275357f433..c479da73127760977d4c27b3ff9873d10c295c57 100644 (file)
@@ -11,15 +11,12 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -195,7 +192,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     @Override
     public ListenableFuture<Void> commit() {
         OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
-                new CommitCallback(actorContext);
+                new TransactionRateLimitingCallback(actorContext);
 
         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
                 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
@@ -311,58 +308,4 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     List<Future<ActorSelection>> getCohortFutures() {
         return Collections.unmodifiableList(cohortFutures);
     }
-
-    private static interface OperationCallback {
-        void run();
-        void success();
-        void failure();
-    }
-
-    private static class CommitCallback implements OperationCallback{
-
-        private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
-        private static final String COMMIT = "commit";
-
-        private final Timer commitTimer;
-        private final ActorContext actorContext;
-        private Timer.Context timerContext;
-
-        CommitCallback(ActorContext actorContext){
-            this.actorContext = actorContext;
-            commitTimer = actorContext.getOperationTimer(COMMIT);
-        }
-
-        @Override
-        public void run() {
-            timerContext = commitTimer.time();
-        }
-
-        @Override
-        public void success() {
-            timerContext.stop();
-
-            Snapshot timerSnapshot = commitTimer.getSnapshot();
-            double allowedLatencyInNanos = timerSnapshot.get95thPercentile();
-
-            long commitTimeoutInSeconds = actorContext.getDatastoreContext()
-                    .getShardTransactionCommitTimeoutInSeconds();
-            long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
-
-            // Here we are trying to find out how many transactions per second are allowed
-            double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
-
-            LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
-                    actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
-
-            actorContext.setTxCreationLimit(newRateLimit);
-        }
-
-        @Override
-        public void failure() {
-            // This would mean we couldn't get a transaction completed in 30 seconds which is
-            // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
-            // not going to be useful - so we leave it as it is
-        }
-    }
-
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java
new file mode 100644 (file)
index 0000000..1202a90
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TransactionRateLimitingCallback computes the new transaction rate limit on the successful completion of a
+ * transaction
+ */
+public class TransactionRateLimitingCallback implements OperationCallback{
+
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionRateLimitingCallback.class);
+    private static final String COMMIT = "commit";
+
+    private final Timer commitTimer;
+    private final ActorContext actorContext;
+    private Timer.Context timerContext;
+
+    TransactionRateLimitingCallback(ActorContext actorContext){
+        this.actorContext = actorContext;
+        commitTimer = actorContext.getOperationTimer(COMMIT);
+    }
+
+    @Override
+    public void run() {
+        timerContext = commitTimer.time();
+    }
+
+    @Override
+    public void success() {
+        Preconditions.checkState(timerContext != null, "Call run before success");
+        timerContext.stop();
+
+        Snapshot timerSnapshot = commitTimer.getSnapshot();
+        double newRateLimit = 0;
+
+        long commitTimeoutInSeconds = actorContext.getDatastoreContext()
+                .getShardTransactionCommitTimeoutInSeconds();
+        long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+        // Find the time that it takes for transactions to get executed in every 10th percentile
+        // Compute the rate limit for that percentile and sum it up
+        for(int i=1;i<=10;i++){
+            // Get the amount of time transactions take in the i*10th percentile
+            double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D);
+
+            if(percentileTimeInNanos > 0) {
+                // Figure out the rate limit for the i*10th percentile in nanos
+                double percentileRateLimit = ((double) commitTimeoutInNanos / percentileTimeInNanos);
+
+                // Add the percentileRateLimit to the total rate limit
+                newRateLimit += percentileRateLimit;
+            }
+        }
+
+        // Compute the rate limit per second
+        newRateLimit = newRateLimit/(commitTimeoutInSeconds*10);
+
+        LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit);
+
+        actorContext.setTxCreationLimit(newRateLimit);
+    }
+
+    @Override
+    public void failure() {
+        // This would mean we couldn't get a transaction completed in 30 seconds which is
+        // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+        // not going to be useful - so we leave it as it is
+    }
+}
\ No newline at end of file
index 0a2a0d1bc0595a8932f2e2ee3ab27c6b8f422855..647b6e7b542508953bb9750555aaf193c0ba2864 100644 (file)
@@ -7,7 +7,6 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorPath;
@@ -68,11 +67,15 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         doReturn(getSystem()).when(actorContext).getActorSystem();
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
-        doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+        doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
         doReturn(commitTimerContext).when(commitTimer).time();
         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
-        doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get95thPercentile();
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
         doReturn(10.0).when(actorContext).getTxCreationLimit();
     }
 
@@ -332,8 +335,6 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
 
-        // Verify that the creation limit was changed to 0.5 (based on setup)
-        verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
     }
 
     @Test
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCommitCallbackTest.java
new file mode 100644 (file)
index 0000000..b037627
--- /dev/null
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyDouble;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import java.util.concurrent.TimeUnit;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+public class TransactionRateLimitingCommitCallbackTest {
+
+    @Mock
+    public ActorContext actorContext;
+
+    @Mock
+    public DatastoreContext datastoreContext;
+
+    @Mock
+    public Timer commitTimer;
+
+    @Mock
+    private Timer.Context commitTimerContext;
+
+    @Mock
+    private Snapshot commitSnapshot;
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+        doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+        doReturn(commitTimerContext).when(commitTimer).time();
+        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+    }
+
+    @Test
+    public void testSuccess(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+
+        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+        commitCallback.run();
+        commitCallback.success();
+
+        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(292)));
+
+        doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1);
+
+        commitCallback = new TransactionRateLimitingCallback(actorContext);
+        commitCallback.run();
+        commitCallback.success();
+
+        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(292)));
+    }
+
+    @Test
+    public void testSuccessPercentileValueZero(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1);
+
+        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+        commitCallback.run();
+        commitCallback.success();
+
+        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(192)));
+    }
+
+    @Test
+    public void testSuccessOnePercentileValueVeryHigh(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        // ten seconds
+        doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(1.0);
+
+        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+        commitCallback.run();
+        commitCallback.success();
+
+        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(282)));
+    }
+
+    @Test
+    public void testSuccessWithAllPercentileValueVeryHigh(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+        commitCallback.run();
+        commitCallback.success();
+
+        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(0)));
+    }
+
+    @Test
+    public void testSuccessWithRealPercentileValues(){
+
+        for(int i=1;i<11;i++){
+            // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+            // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+            doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1);
+        }
+
+        doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue( 0.7);
+        doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue( 0.9);
+        doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue( 1.0);
+
+        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+        commitCallback.run();
+        commitCallback.success();
+
+        verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(101)));
+    }
+
+
+    @Test
+    public void testSuccessWithoutRun(){
+        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+
+        try {
+            commitCallback.success();
+            fail("Expected IllegalStateException");
+        } catch(IllegalStateException e){
+
+        }
+
+        verify(actorContext, never()).setTxCreationLimit(anyDouble());
+
+    }
+
+
+    @Test
+    public void testFailure(){
+        TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+        commitCallback.run();
+        commitCallback.failure();
+
+        verify(actorContext, never()).setTxCreationLimit(anyDouble());
+
+    }
+
+    public Matcher<Double> approximately(final double val){
+        return new BaseMatcher<Double>() {
+            @Override
+            public boolean matches(Object o) {
+                Double aDouble = (Double) o;
+                return aDouble > val && aDouble < val+1;
+            }
+
+            @Override
+            public void describeTo(Description description) {
+
+            }
+        };
+    }
+
+
+}
\ No newline at end of file