BUG 2676 : Apply backpressure when creating transactions 70/14870/11
authorMoiz Raja <moraja@cisco.com>
Thu, 5 Feb 2015 02:19:14 +0000 (18:19 -0800)
committerMoiz Raja <moraja@cisco.com>
Wed, 11 Feb 2015 18:50:13 +0000 (10:50 -0800)
This patch uses a rate limiter to limit the number of write transactions
that can be created on the datastore at any given point of time (within a
second).

We also have added a metric that is trackable using JMX which will show
exactly how much time we take in committing a transaction.

Could this rate limiter have been on the DataBroker? Yes it could have been
but I think we need to go granular on this one. Ideally the ratelimiter would
be for every shard transaction but it's not trivial to do in a way that would
actually block the consumer for now - this is because creating a transaction on
Shard itself is an async operation.

Change-Id: I3d8e3baeb892ba494c0ab0d13a0d6226c1516511
Signed-off-by: Moiz Raja <moraja@cisco.com>
12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java

index 01e42dbb8e92400b7f4af73d55257d6f9ebbe4a2..d2e8b0efd80ed20981e84f30532380f0762b4e8b 100644 (file)
@@ -37,13 +37,15 @@ public class DatastoreContext {
     private final boolean persistent;
     private final ConfigurationReader configurationReader;
     private final long shardElectionTimeoutFactor;
+    private final long transactionCreationInitialRateLimit;
 
     private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
             ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
             Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
             int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout,
             Timeout shardLeaderElectionTimeout,
-            boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor) {
+            boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor,
+            long transactionCreationInitialRateLimit) {
         this.dataStoreProperties = dataStoreProperties;
         this.shardRaftConfig = shardRaftConfig;
         this.dataStoreMXBeanType = dataStoreMXBeanType;
@@ -56,6 +58,7 @@ public class DatastoreContext {
         this.persistent = persistent;
         this.configurationReader = configurationReader;
         this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+        this.transactionCreationInitialRateLimit = transactionCreationInitialRateLimit;
     }
 
     public static Builder newBuilder() {
@@ -110,6 +113,11 @@ public class DatastoreContext {
         return this.shardElectionTimeoutFactor;
     }
 
+
+    public long getTransactionCreationInitialRateLimit() {
+        return transactionCreationInitialRateLimit;
+    }
+
     public static class Builder {
         private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
         private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
@@ -127,6 +135,7 @@ public class DatastoreContext {
         private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
         private int shardSnapshotDataThresholdPercentage = 12;
         private long shardElectionTimeoutFactor = 2;
+        private long transactionCreationInitialRateLimit = 100;
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
             this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
@@ -209,6 +218,11 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder transactionCreationInitialRateLimit(long initialRateLimit){
+            this.transactionCreationInitialRateLimit = initialRateLimit;
+            return this;
+        }
+
 
         public DatastoreContext build() {
             DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
@@ -225,7 +239,7 @@ public class DatastoreContext {
                     operationTimeoutInSeconds, shardTransactionIdleTimeout,
                     shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity,
                     shardInitializationTimeout, shardLeaderElectionTimeout,
-                    persistent, configurationReader, shardElectionTimeoutFactor);
+                    persistent, configurationReader, shardElectionTimeoutFactor, transactionCreationInitialRateLimit);
         }
     }
 }
index 930c5f7257dc6c061f23f54868a55f8da68b4acb..e9038ed4f0f9ab5a37af554ef43215af8668e6c8 100644 (file)
@@ -54,7 +54,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
                 ShardManager.props(type, cluster, configuration, datastoreContext)
                     .withMailbox(ActorContext.MAILBOX), shardManagerId ),
-                cluster, configuration, datastoreContext);
+                cluster, configuration, datastoreContext, type);
     }
 
     public DistributedDataStore(ActorContext actorContext) {
@@ -94,11 +94,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        actorContext.acquireTxCreationPermit();
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        actorContext.acquireTxCreationPermit();
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
     }
 
index 932c36fe3469f00b5c1d5ed6b38706f48783117f..4f472266c1f56acbd8fc531ae7189f1ae91951b4 100644 (file)
@@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -44,6 +47,19 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     private final List<Future<ActorSelection>> cohortFutures;
     private volatile List<ActorSelection> cohorts;
     private final String transactionId;
+    private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+        @Override
+        public void run() {
+        }
+
+        @Override
+        public void success() {
+        }
+
+        @Override
+        public void failure() {
+        }
+    };
 
     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
             List<Future<ActorSelection>> cohortFutures, String transactionId) {
@@ -151,8 +167,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
             }
-
-            futureList.add(actorContext.executeOperationAsync(cohort, message));
+            futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
         }
 
         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
@@ -179,12 +194,20 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     @Override
     public ListenableFuture<Void> commit() {
-        return voidOperation("commit",  new CommitTransaction(transactionId).toSerializable(),
-                CommitTransactionReply.SERIALIZABLE_CLASS, true);
+        OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
+                new CommitCallback(actorContext);
+
+        return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
+                CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
+    }
+
+    private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
+                                                 final Class<?> expectedResponseClass, final boolean propagateException) {
+        return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
     }
 
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
-            final Class<?> expectedResponseClass, final boolean propagateException) {
+                                                 final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
 
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {} {}", transactionId, operationName);
@@ -196,7 +219,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
         if(cohorts != null) {
             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
-                    returnFuture);
+                    returnFuture, callback);
         } else {
             buildCohortList().onComplete(new OnComplete<Void>() {
                 @Override
@@ -213,7 +236,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                         }
                     } else {
                         finishVoidOperation(operationName, message, expectedResponseClass,
-                                propagateException, returnFuture);
+                                propagateException, returnFuture, callback);
                     }
                 }
             }, actorContext.getActorSystem().dispatcher());
@@ -223,11 +246,14 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     }
 
     private void finishVoidOperation(final String operationName, final Object message,
-            final Class<?> expectedResponseClass, final boolean propagateException,
-            final SettableFuture<Void> returnFuture) {
+                                     final Class<?> expectedResponseClass, final boolean propagateException,
+                                     final SettableFuture<Void> returnFuture, final OperationCallback callback) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {} finish {}", transactionId, operationName);
         }
+
+        callback.run();
+
         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
 
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@@ -247,6 +273,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
 
                 if(exceptionToPropagate != null) {
+
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
                             operationName, exceptionToPropagate);
@@ -265,11 +292,16 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                         }
                         returnFuture.set(null);
                     }
+
+                    callback.failure();
                 } else {
+
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
                     }
                     returnFuture.set(null);
+
+                    callback.success();
                 }
             }
         }, actorContext.getActorSystem().dispatcher());
@@ -279,4 +311,58 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     List<Future<ActorSelection>> getCohortFutures() {
         return Collections.unmodifiableList(cohortFutures);
     }
+
+    private static interface OperationCallback {
+        void run();
+        void success();
+        void failure();
+    }
+
+    private static class CommitCallback implements OperationCallback{
+
+        private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
+        private static final String COMMIT = "commit";
+
+        private final Timer commitTimer;
+        private final ActorContext actorContext;
+        private Timer.Context timerContext;
+
+        CommitCallback(ActorContext actorContext){
+            this.actorContext = actorContext;
+            commitTimer = actorContext.getOperationTimer(COMMIT);
+        }
+
+        @Override
+        public void run() {
+            timerContext = commitTimer.time();
+        }
+
+        @Override
+        public void success() {
+            timerContext.stop();
+
+            Snapshot timerSnapshot = commitTimer.getSnapshot();
+            double allowedLatencyInNanos = timerSnapshot.get98thPercentile();
+
+            long commitTimeoutInSeconds = actorContext.getDatastoreContext()
+                    .getShardTransactionCommitTimeoutInSeconds();
+            long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+            // Here we are trying to find out how many transactions per second are allowed
+            double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
+
+            LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
+                    actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
+
+            actorContext.setTxCreationLimit(newRateLimit);
+        }
+
+        @Override
+        public void failure() {
+            // This would mean we couldn't get a transaction completed in 30 seconds which is
+            // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+            // not going to be useful - so we leave it as it is
+        }
+    }
+
 }
index 87959efe8ae2def5684e253f2e0840c7177db838..ee3a5cc82573d6e415a5bbcd080277673e4cb051 100644 (file)
@@ -104,11 +104,13 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        actorContext.acquireTxCreationPermit();
         return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        actorContext.acquireTxCreationPermit();
         return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
index c9fdf389311f73c70ca2e0f16dae8e86b7cc0a05..7138a4a6e7b3bc05e610ad323c96ea13eae10281 100644 (file)
@@ -18,9 +18,13 @@ import akka.actor.PoisonPill;
 import akka.dispatch.Mapper;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -54,11 +58,11 @@ import scala.concurrent.duration.FiniteDuration;
  * but should not be passed to actors especially remote actors
  */
 public class ActorContext {
-    private static final Logger
-        LOG = LoggerFactory.getLogger(ActorContext.class);
-
-    public static final String MAILBOX = "bounded-mailbox";
-
+    private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
+    private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+    private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
+    private static final String METRIC_RATE = "rate";
+    private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
                                                               new Mapper<Throwable, Throwable>() {
         @Override
@@ -74,36 +78,47 @@ public class ActorContext {
             return actualFailure;
         }
     };
+    public static final String MAILBOX = "bounded-mailbox";
 
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
     private final DatastoreContext datastoreContext;
-    private volatile SchemaContext schemaContext;
+    private final String dataStoreType;
     private final FiniteDuration operationDuration;
     private final Timeout operationTimeout;
     private final String selfAddressHostPort;
+    private final RateLimiter txRateLimiter;
+    private final MetricRegistry metricRegistry = new MetricRegistry();
+    private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
+    private final Timeout transactionCommitOperationTimeout;
+
+    private volatile SchemaContext schemaContext;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
         this(actorSystem, shardManager, clusterWrapper, configuration,
-                DatastoreContext.newBuilder().build());
+                DatastoreContext.newBuilder().build(), UNKNOWN_DATA_STORE_TYPE);
     }
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration,
-            DatastoreContext datastoreContext) {
+            DatastoreContext datastoreContext, String dataStoreType) {
         this.actorSystem = actorSystem;
         this.shardManager = shardManager;
         this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
+        this.dataStoreType = dataStoreType;
+        this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
 
-        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
-                TimeUnit.SECONDS);
+        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
         operationTimeout = new Timeout(operationDuration);
+        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+                TimeUnit.SECONDS));
+
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -113,6 +128,7 @@ public class ActorContext {
         }
 
         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
+        jmxReporter.start();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -446,4 +462,59 @@ public class ActorContext {
     public int getTransactionOutstandingOperationLimit(){
         return transactionOutstandingOperationLimit;
     }
+
+    /**
+     * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
+     * us to create a timer for pretty much anything.
+     *
+     * @param operationName
+     * @return
+     */
+    public Timer getOperationTimer(String operationName){
+        final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName, METRIC_RATE);
+        return metricRegistry.timer(rate);
+    }
+
+    /**
+     * Get the type of the data store to which this ActorContext belongs
+     *
+     * @return
+     */
+    public String getDataStoreType() {
+        return dataStoreType;
+    }
+
+    /**
+     * Set the number of transaction creation permits that are to be allowed
+     *
+     * @param permitsPerSecond
+     */
+    public void setTxCreationLimit(double permitsPerSecond){
+        txRateLimiter.setRate(permitsPerSecond);
+    }
+
+    /**
+     * Get the current transaction creation rate limit
+     * @return
+     */
+    public double getTxCreationLimit(){
+        return txRateLimiter.getRate();
+    }
+
+    /**
+     * Try to acquire a transaction creation permit. Will block if no permits are available.
+     */
+    public void acquireTxCreationPermit(){
+        txRateLimiter.acquire();
+    }
+
+    /**
+     * Return the operation timeout to be used when committing transactions
+     * @return
+     */
+    public Timeout getTransactionCommitOperationTimeout(){
+        return transactionCommitOperationTimeout;
+    }
+
+
 }
index 711c6a37b5c8e9aff85dd91db8d5041be67e8d4c..253422122d57502b9a54724b0d00d69b5a71b83e 100644 (file)
@@ -67,6 +67,7 @@ public class DistributedConfigDataStoreProviderModule extends
                 .shardIsolatedLeaderCheckIntervalInMillis(
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+                .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
index d9df06df1c852e873225b670df7e3fd352213ef9..d41b83d0d2e679c21daa695deef64a52cf94c245 100644 (file)
@@ -67,6 +67,7 @@ public class DistributedOperationalDataStoreProviderModule extends
                 .shardIsolatedLeaderCheckIntervalInMillis(
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+                .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance("operational",
index 46cd50d0c158b6a7e316364a588d28f8364fffce..e2ee7373d0cfd3c4a68ec98a221fd1a121f4f13c 100644 (file)
@@ -180,6 +180,14 @@ module distributed-datastore-provider {
             description "The interval at which the leader of the shard will check if its majority
                         followers are active and term itself as isolated";
         }
+
+        leaf tx-creation-initial-rate-limit {
+            default 100;
+            type non-zero-uint32-type;
+            description "The initial number of transactions per second that are allowed before the data store
+                         should begin applying back pressure. This number is only used as an initial guidance,
+                         subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit";
+        }
     }
 
     // Augments the 'configuration' choice node under modules/module.
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
new file mode 100644 (file)
index 0000000..66fa876
--- /dev/null
@@ -0,0 +1,60 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class DistributedDataStoreTest extends AbstractActorTest {
+
+    private SchemaContext schemaContext;
+
+    @Mock
+    private ActorContext actorContext;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        schemaContext = TestModel.createTestContext();
+
+        doReturn(schemaContext).when(actorContext).getSchemaContext();
+    }
+
+    @Test
+    public void testRateLimitingUsedInReadWriteTxCreation(){
+        DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+        distributedDataStore.newReadWriteTransaction();
+
+        verify(actorContext, times(1)).acquireTxCreationPermit();
+    }
+
+    @Test
+    public void testRateLimitingUsedInWriteOnlyTxCreation(){
+        DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+        distributedDataStore.newWriteOnlyTransaction();
+
+        verify(actorContext, times(1)).acquireTxCreationPermit();
+    }
+
+
+    @Test
+    public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+        DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+        distributedDataStore.newReadOnlyTransaction();
+        distributedDataStore.newReadOnlyTransaction();
+        distributedDataStore.newReadOnlyTransaction();
+
+        verify(actorContext, times(0)).acquireTxCreationPermit();
+    }
+
+}
\ No newline at end of file
index 75c93dd5d2fd2de9521d7540da7169fe656c71e9..d2396e0524f340844ea5f7c0e35dfd6cad0b4806 100644 (file)
@@ -3,14 +3,20 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
@@ -43,11 +49,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Mock
     private ActorContext actorContext;
 
+    @Mock
+    private DatastoreContext datastoreContext;
+
+    @Mock
+    private Timer commitTimer;
+
+    @Mock
+    private Timer.Context commitTimerContext;
+
+    @Mock
+    private Snapshot commitSnapshot;
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
         doReturn(getSystem()).when(actorContext).getActorSystem();
+        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+        doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+        doReturn(commitTimerContext).when(commitTimer).time();
+        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+        doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+        doReturn(10.0).when(actorContext).getTxCreationLimit();
     }
 
     private Future<ActorSelection> newCohort() {
@@ -86,12 +111,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         }
 
         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
-                isA(requestType));
+                isA(requestType), any(Timeout.class));
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
         verify(actorContext, times(nCohorts)).executeOperationAsync(
-                any(ActorSelection.class), isA(requestType));
+                any(ActorSelection.class), isA(requestType), any(Timeout.class));
     }
 
     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
@@ -276,8 +301,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         try {
             propagateExecutionExceptionCause(proxy.commit());
         } finally {
+
+            verify(actorContext, never()).setTxCreationLimit(anyLong());
             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
         }
+
     }
 
     @Test
@@ -294,11 +322,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
                 new CommitTransactionReply(), new CommitTransactionReply());
 
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
         proxy.canCommit().get(5, TimeUnit.SECONDS);
         proxy.preCommit().get(5, TimeUnit.SECONDS);
         proxy.commit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+
+        // Verify that the creation limit was changed to 0.5 (based on setup)
+        verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
+    }
+
+    @Test
+    public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verify(actorContext, never()).setTxCreationLimit(anyLong());
     }
 }
index dd37371a4510c622b4ee0fdb3b5ab5cf67b5bf7f..23c3a82a38255e9bfff2708583c4370ccb6ccf9a 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@ -29,10 +32,17 @@ public class TransactionChainProxyTest extends AbstractActorTest{
     ActorContext actorContext = null;
     SchemaContext schemaContext = mock(SchemaContext.class);
 
+    @Mock
+    ActorContext mockActorContext;
+
     @Before
     public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
         actorContext = new MockActorContext(getSystem());
         actorContext.setSchemaContext(schemaContext);
+
+        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
     }
 
     @SuppressWarnings("resource")
@@ -76,4 +86,32 @@ public class TransactionChainProxyTest extends AbstractActorTest{
 
         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
     }
+
+    @Test
+    public void testRateLimitingUsedInReadWriteTxCreation(){
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        txChainProxy.newReadWriteTransaction();
+
+        verify(mockActorContext, times(1)).acquireTxCreationPermit();
+    }
+
+    @Test
+    public void testRateLimitingUsedInWriteOnlyTxCreation(){
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        txChainProxy.newWriteOnlyTransaction();
+
+        verify(mockActorContext, times(1)).acquireTxCreationPermit();
+    }
+
+
+    @Test
+    public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        txChainProxy.newReadOnlyTransaction();
+
+        verify(mockActorContext, times(0)).acquireTxCreationPermit();
+    }
 }
index e4ab969f5c4351c0e5b3894d3d3115aa6322337a..16db2be185e00ff11b9c31a0ebc9dac5829c75a3 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -12,10 +13,12 @@ import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.time.StopWatch;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
@@ -265,4 +268,34 @@ public class ActorContextTest extends AbstractActorTest{
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testRateLimiting(){
+        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), mockDataStoreContext, "config");
+
+        // Check that the initial value is being picked up from DataStoreContext
+        assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+
+        actorContext.setTxCreationLimit(1.0);
+
+        assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
+
+
+        StopWatch watch = new StopWatch();
+
+        watch.start();
+
+        actorContext.acquireTxCreationPermit();
+        actorContext.acquireTxCreationPermit();
+        actorContext.acquireTxCreationPermit();
+
+        watch.stop();
+
+        assertTrue("did not take as much time as expected", watch.getTime() > 1000);
+    }
 }