--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+public interface OperationCallback {
+ void run();
+ void success();
+ void failure();
+}
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@Override
public ListenableFuture<Void> commit() {
OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
- new CommitCallback(actorContext);
+ new TransactionRateLimitingCallback(actorContext);
return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
List<Future<ActorSelection>> getCohortFutures() {
return Collections.unmodifiableList(cohortFutures);
}
-
- private static interface OperationCallback {
- void run();
- void success();
- void failure();
- }
-
- private static class CommitCallback implements OperationCallback{
-
- private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
- private static final String COMMIT = "commit";
-
- private final Timer commitTimer;
- private final ActorContext actorContext;
- private Timer.Context timerContext;
-
- CommitCallback(ActorContext actorContext){
- this.actorContext = actorContext;
- commitTimer = actorContext.getOperationTimer(COMMIT);
- }
-
- @Override
- public void run() {
- timerContext = commitTimer.time();
- }
-
- @Override
- public void success() {
- timerContext.stop();
-
- Snapshot timerSnapshot = commitTimer.getSnapshot();
- double allowedLatencyInNanos = timerSnapshot.get95thPercentile();
-
- long commitTimeoutInSeconds = actorContext.getDatastoreContext()
- .getShardTransactionCommitTimeoutInSeconds();
- long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
-
- // Here we are trying to find out how many transactions per second are allowed
- double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
-
- LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
- actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
-
- actorContext.setTxCreationLimit(newRateLimit);
- }
-
- @Override
- public void failure() {
- // This would mean we couldn't get a transaction completed in 30 seconds which is
- // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
- // not going to be useful - so we leave it as it is
- }
- }
-
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TransactionRateLimitingCallback computes the new transaction rate limit on the successful completion of a
+ * transaction
+ */
+public class TransactionRateLimitingCallback implements OperationCallback{
+
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionRateLimitingCallback.class);
+ private static final String COMMIT = "commit";
+
+ private final Timer commitTimer;
+ private final ActorContext actorContext;
+ private Timer.Context timerContext;
+
+ TransactionRateLimitingCallback(ActorContext actorContext){
+ this.actorContext = actorContext;
+ commitTimer = actorContext.getOperationTimer(COMMIT);
+ }
+
+ @Override
+ public void run() {
+ timerContext = commitTimer.time();
+ }
+
+ @Override
+ public void success() {
+ Preconditions.checkState(timerContext != null, "Call run before success");
+ timerContext.stop();
+
+ Snapshot timerSnapshot = commitTimer.getSnapshot();
+ double newRateLimit = 0;
+
+ long commitTimeoutInSeconds = actorContext.getDatastoreContext()
+ .getShardTransactionCommitTimeoutInSeconds();
+ long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+ // Find the time that it takes for transactions to get executed in every 10th percentile
+ // Compute the rate limit for that percentile and sum it up
+ for(int i=1;i<=10;i++){
+ // Get the amount of time transactions take in the i*10th percentile
+ double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D);
+
+ if(percentileTimeInNanos > 0) {
+ // Figure out the rate limit for the i*10th percentile in nanos
+ double percentileRateLimit = ((double) commitTimeoutInNanos / percentileTimeInNanos);
+
+ // Add the percentileRateLimit to the total rate limit
+ newRateLimit += percentileRateLimit;
+ }
+ }
+
+ // Compute the rate limit per second
+ newRateLimit = newRateLimit/(commitTimeoutInSeconds*10);
+
+ LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit);
+
+ actorContext.setTxCreationLimit(newRateLimit);
+ }
+
+ @Override
+ public void failure() {
+ // This would mean we couldn't get a transaction completed in 30 seconds which is
+ // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+ // not going to be useful - so we leave it as it is
+ }
+}
\ No newline at end of file
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorPath;
doReturn(getSystem()).when(actorContext).getActorSystem();
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
doReturn(datastoreContext).when(actorContext).getDatastoreContext();
- doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+ doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
doReturn(commitTimerContext).when(commitTimer).time();
doReturn(commitSnapshot).when(commitTimer).getSnapshot();
- doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get95thPercentile();
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
doReturn(10.0).when(actorContext).getTxCreationLimit();
}
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
- // Verify that the creation limit was changed to 0.5 (based on setup)
- verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
}
@Test
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyDouble;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import java.util.concurrent.TimeUnit;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+public class TransactionRateLimitingCommitCallbackTest {
+
+ @Mock
+ public ActorContext actorContext;
+
+ @Mock
+ public DatastoreContext datastoreContext;
+
+ @Mock
+ public Timer commitTimer;
+
+ @Mock
+ private Timer.Context commitTimerContext;
+
+ @Mock
+ private Snapshot commitSnapshot;
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+ doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+ doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+ doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+ doReturn(commitTimerContext).when(commitTimer).time();
+ doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+ }
+
+ @Test
+ public void testSuccess(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+
+ TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+ commitCallback.run();
+ commitCallback.success();
+
+ verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(292)));
+
+ doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1);
+
+ commitCallback = new TransactionRateLimitingCallback(actorContext);
+ commitCallback.run();
+ commitCallback.success();
+
+ verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(292)));
+ }
+
+ @Test
+ public void testSuccessPercentileValueZero(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ doReturn(TimeUnit.MILLISECONDS.toNanos(0) * 1D).when(commitSnapshot).getValue(0.1);
+
+ TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+ commitCallback.run();
+ commitCallback.success();
+
+ verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(192)));
+ }
+
+ @Test
+ public void testSuccessOnePercentileValueVeryHigh(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ // ten seconds
+ doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(1.0);
+
+ TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+ commitCallback.run();
+ commitCallback.success();
+
+ verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(282)));
+ }
+
+ @Test
+ public void testSuccessWithAllPercentileValueVeryHigh(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(10000) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+ commitCallback.run();
+ commitCallback.success();
+
+ verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(0)));
+ }
+
+ @Test
+ public void testSuccessWithRealPercentileValues(){
+
+ for(int i=1;i<11;i++){
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(8) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
+
+ doReturn(TimeUnit.MILLISECONDS.toNanos(20) * 1D).when(commitSnapshot).getValue( 0.7);
+ doReturn(TimeUnit.MILLISECONDS.toNanos(100) * 1D).when(commitSnapshot).getValue( 0.9);
+ doReturn(TimeUnit.MILLISECONDS.toNanos(200) * 1D).when(commitSnapshot).getValue( 1.0);
+
+ TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+ commitCallback.run();
+ commitCallback.success();
+
+ verify(actorContext).setTxCreationLimit(Matchers.doubleThat(approximately(101)));
+ }
+
+
+ @Test
+ public void testSuccessWithoutRun(){
+ TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+
+ try {
+ commitCallback.success();
+ fail("Expected IllegalStateException");
+ } catch(IllegalStateException e){
+
+ }
+
+ verify(actorContext, never()).setTxCreationLimit(anyDouble());
+
+ }
+
+
+ @Test
+ public void testFailure(){
+ TransactionRateLimitingCallback commitCallback = new TransactionRateLimitingCallback(actorContext);
+ commitCallback.run();
+ commitCallback.failure();
+
+ verify(actorContext, never()).setTxCreationLimit(anyDouble());
+
+ }
+
+ public Matcher<Double> approximately(final double val){
+ return new BaseMatcher<Double>() {
+ @Override
+ public boolean matches(Object o) {
+ Double aDouble = (Double) o;
+ return aDouble > val && aDouble < val+1;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+
+ }
+ };
+ }
+
+
+}
\ No newline at end of file