Merge topic 'bug/2676'
authorTom Pantelis <tpanteli@brocade.com>
Wed, 11 Feb 2015 22:00:53 +0000 (22:00 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 11 Feb 2015 22:00:53 +0000 (22:00 +0000)
* changes:
  Store the datastore type in dataStoreContext
  Refactor DataStoreContext
  BUG 2676 : Apply backpressure when creating transactions

18 files changed:
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java

index d6030ea45720b1f7071bf88b7d7bf9bde4314c21..e27546f5dcbabb5c3dec601b19a5b358f26518ee 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-impl</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>
index 01e42dbb8e92400b7f4af73d55257d6f9ebbe4a2..cee781fb88e535a04251f66b948e1d1123169a5c 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.util.Timeout;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.text.WordUtils;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
 import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
@@ -25,37 +26,43 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class DatastoreContext {
 
-    private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
-    private final Duration shardTransactionIdleTimeout;
-    private final int operationTimeoutInSeconds;
-    private final String dataStoreMXBeanType;
-    private final ConfigParams shardRaftConfig;
-    private final int shardTransactionCommitTimeoutInSeconds;
-    private final int shardTransactionCommitQueueCapacity;
-    private final Timeout shardInitializationTimeout;
-    private final Timeout shardLeaderElectionTimeout;
-    private final boolean persistent;
-    private final ConfigurationReader configurationReader;
-    private final long shardElectionTimeoutFactor;
-
-    private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
-            ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
-            Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
-            int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout,
-            Timeout shardLeaderElectionTimeout,
-            boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor) {
-        this.dataStoreProperties = dataStoreProperties;
-        this.shardRaftConfig = shardRaftConfig;
-        this.dataStoreMXBeanType = dataStoreMXBeanType;
-        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
-        this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
-        this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
-        this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
-        this.shardInitializationTimeout = shardInitializationTimeout;
-        this.shardLeaderElectionTimeout = shardLeaderElectionTimeout;
-        this.persistent = persistent;
-        this.configurationReader = configurationReader;
-        this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+    public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
+    public static final int DEFAULT_OPERATION_TIMEOUT_IN_SECONDS = 5;
+    public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
+    public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000;
+    public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
+    public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
+    public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
+    public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000;
+    public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
+    public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
+    public static final boolean DEFAULT_PERSISTENT = true;
+    public static final FileConfigurationReader DEFAULT_CONFIGURATION_READER = new FileConfigurationReader();
+    public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
+    public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
+    public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
+    public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+
+    private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+    private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+    private int operationTimeoutInSeconds = DEFAULT_OPERATION_TIMEOUT_IN_SECONDS;
+    private String dataStoreMXBeanType;
+    private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
+    private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
+    private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
+    private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+    private boolean persistent = DEFAULT_PERSISTENT;
+    private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
+    private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
+    private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+    private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+
+    private DatastoreContext(){
+        setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
+        setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
+        setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
+        setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
+        setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
     }
 
     public static Builder newBuilder() {
@@ -79,7 +86,7 @@ public class DatastoreContext {
     }
 
     public ConfigParams getShardRaftConfig() {
-        return shardRaftConfig;
+        return raftConfig;
     }
 
     public int getShardTransactionCommitTimeoutInSeconds() {
@@ -107,125 +114,140 @@ public class DatastoreContext {
     }
 
     public long getShardElectionTimeoutFactor(){
-        return this.shardElectionTimeoutFactor;
+        return raftConfig.getElectionTimeoutFactor();
+    }
+
+    public String getDataStoreType(){
+        return dataStoreType;
+    }
+
+    public long getTransactionCreationInitialRateLimit() {
+        return transactionCreationInitialRateLimit;
+    }
+
+    private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
+        raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
+                TimeUnit.MILLISECONDS));
+    }
+
+    private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){
+        raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
+    }
+
+
+    private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
+        raftConfig.setIsolatedLeaderCheckInterval(
+                new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
+    }
+
+    private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+        raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
+    }
+
+    private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+        raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
+    }
+
+    private void setSnapshotBatchCount(int shardSnapshotBatchCount) {
+        raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
     public static class Builder {
-        private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
-        private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
-        private int operationTimeoutInSeconds = 5;
-        private String dataStoreMXBeanType;
-        private int shardTransactionCommitTimeoutInSeconds = 30;
-        private int shardJournalRecoveryLogBatchSize = 1000;
-        private int shardSnapshotBatchCount = 20000;
-        private int shardHeartbeatIntervalInMillis = 500;
-        private int shardTransactionCommitQueueCapacity = 20000;
-        private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES);
-        private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
-        private boolean persistent = true;
-        private ConfigurationReader configurationReader = new FileConfigurationReader();
-        private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
-        private int shardSnapshotDataThresholdPercentage = 12;
-        private long shardElectionTimeoutFactor = 2;
+        private DatastoreContext datastoreContext = new DatastoreContext();
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
-            this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+            datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
             return this;
         }
 
         public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
-            this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+            datastoreContext.operationTimeoutInSeconds = operationTimeoutInSeconds;
             return this;
         }
 
         public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
-            this.dataStoreMXBeanType = dataStoreMXBeanType;
+            datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
             return this;
         }
 
         public Builder dataStoreProperties(InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
-            this.dataStoreProperties = dataStoreProperties;
+            datastoreContext.dataStoreProperties = dataStoreProperties;
             return this;
         }
 
         public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
-            this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
+            datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
             return this;
         }
 
         public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
-            this.shardJournalRecoveryLogBatchSize = shardJournalRecoveryLogBatchSize;
+            datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
             return this;
         }
 
         public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
-            this.shardSnapshotBatchCount = shardSnapshotBatchCount;
+            datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
             return this;
         }
 
         public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
-            this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+            datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
             return this;
         }
 
-
         public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
-            this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
+            datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
             return this;
         }
 
         public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
-            this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
+            datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
             return this;
         }
 
         public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
-            this.shardInitializationTimeout = new Timeout(timeout, unit);
+            datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
             return this;
         }
 
         public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
-            this.shardLeaderElectionTimeout = new Timeout(timeout, unit);
+            datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
             return this;
         }
 
         public Builder configurationReader(ConfigurationReader configurationReader){
-            this.configurationReader = configurationReader;
+            datastoreContext.configurationReader = configurationReader;
             return this;
         }
 
         public Builder persistent(boolean persistent){
-            this.persistent = persistent;
+            datastoreContext.persistent = persistent;
             return this;
         }
 
         public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
-            this.shardIsolatedLeaderCheckIntervalInMillis = shardIsolatedLeaderCheckIntervalInMillis;
+            datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
             return this;
         }
 
         public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){
-            this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+            datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
             return this;
         }
 
+        public Builder transactionCreationInitialRateLimit(long initialRateLimit){
+            datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
+            return this;
+        }
 
-        public DatastoreContext build() {
-            DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
-            raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
-                    TimeUnit.MILLISECONDS));
-            raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
-            raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
-            raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
-            raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
-            raftConfig.setIsolatedLeaderCheckInterval(
-                new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
+        public Builder dataStoreType(String dataStoreType){
+            datastoreContext.dataStoreType = dataStoreType;
+            datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore";
+            return this;
+        }
 
-            return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
-                    operationTimeoutInSeconds, shardTransactionIdleTimeout,
-                    shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity,
-                    shardInitializationTimeout, shardLeaderElectionTimeout,
-                    persistent, configurationReader, shardElectionTimeoutFactor);
+        public DatastoreContext build() {
+            return datastoreContext;
         }
     }
 }
index 930c5f7257dc6c061f23f54868a55f8da68b4acb..107c959112fbc34398ae48b144752a8b28c99e41 100644 (file)
@@ -39,20 +39,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     private final ActorContext actorContext;
 
-    public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
+    public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
             Configuration configuration, DatastoreContext datastoreContext) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
-        Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
 
+        String type = datastoreContext.getDataStoreType();
+
         String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
         actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
-                ShardManager.props(type, cluster, configuration, datastoreContext)
+                ShardManager.props(cluster, configuration, datastoreContext)
                     .withMailbox(ActorContext.MAILBOX), shardManagerId ),
                 cluster, configuration, datastoreContext);
     }
@@ -94,11 +95,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        actorContext.acquireTxCreationPermit();
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        actorContext.acquireTxCreationPermit();
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
     }
 
index 5d63c92e885824c701c4cd8d6f1702dbae84da5f..a9a735ede78de6e31e92fb2f638e6a793e1f2480 100644 (file)
@@ -22,13 +22,13 @@ public class DistributedDataStoreFactory {
 
     private static volatile ActorSystem persistentActorSystem = null;
 
-    public static DistributedDataStore createInstance(String name, SchemaService schemaService,
+    public static DistributedDataStore createInstance(SchemaService schemaService,
                                                       DatastoreContext datastoreContext, BundleContext bundleContext) {
 
         ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
-                new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+                new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem),
                         config, datastoreContext);
 
         ShardStrategyFactory.setConfiguration(config);
index 9c8f0b24440775a68f6047e3be19210583e66dbb..3dbac003b9d66b74520d431c2d7ebb45da488fbc 100644 (file)
@@ -96,17 +96,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private final DataPersistenceProvider dataPersistenceProvider;
 
     /**
-     * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
-     *             configuration or operational
      */
-    protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+    protected ShardManager(ClusterWrapper cluster, Configuration configuration,
             DatastoreContext datastoreContext) {
 
-        this.type = Preconditions.checkNotNull(type, "type should not be null");
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
         this.datastoreContext = datastoreContext;
         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
+        this.type = datastoreContext.getDataStoreType();
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -118,16 +116,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
     }
 
-    public static Props props(final String type,
+    public static Props props(
         final ClusterWrapper cluster,
         final Configuration configuration,
         final DatastoreContext datastoreContext) {
 
-        Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
 
-        return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
     }
 
     @Override
@@ -529,14 +526,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private static class ShardManagerCreator implements Creator<ShardManager> {
         private static final long serialVersionUID = 1L;
 
-        final String type;
         final ClusterWrapper cluster;
         final Configuration configuration;
         final DatastoreContext datastoreContext;
 
-        ShardManagerCreator(String type, ClusterWrapper cluster,
+        ShardManagerCreator(ClusterWrapper cluster,
                 Configuration configuration, DatastoreContext datastoreContext) {
-            this.type = type;
             this.cluster = cluster;
             this.configuration = configuration;
             this.datastoreContext = datastoreContext;
@@ -544,7 +539,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(type, cluster, configuration, datastoreContext);
+            return new ShardManager(cluster, configuration, datastoreContext);
         }
     }
 
index 932c36fe3469f00b5c1d5ed6b38706f48783117f..4f472266c1f56acbd8fc531ae7189f1ae91951b4 100644 (file)
@@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -44,6 +47,19 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     private final List<Future<ActorSelection>> cohortFutures;
     private volatile List<ActorSelection> cohorts;
     private final String transactionId;
+    private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+        @Override
+        public void run() {
+        }
+
+        @Override
+        public void success() {
+        }
+
+        @Override
+        public void failure() {
+        }
+    };
 
     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
             List<Future<ActorSelection>> cohortFutures, String transactionId) {
@@ -151,8 +167,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
             }
-
-            futureList.add(actorContext.executeOperationAsync(cohort, message));
+            futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
         }
 
         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
@@ -179,12 +194,20 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     @Override
     public ListenableFuture<Void> commit() {
-        return voidOperation("commit",  new CommitTransaction(transactionId).toSerializable(),
-                CommitTransactionReply.SERIALIZABLE_CLASS, true);
+        OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
+                new CommitCallback(actorContext);
+
+        return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
+                CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
+    }
+
+    private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
+                                                 final Class<?> expectedResponseClass, final boolean propagateException) {
+        return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
     }
 
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
-            final Class<?> expectedResponseClass, final boolean propagateException) {
+                                                 final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
 
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {} {}", transactionId, operationName);
@@ -196,7 +219,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
         if(cohorts != null) {
             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
-                    returnFuture);
+                    returnFuture, callback);
         } else {
             buildCohortList().onComplete(new OnComplete<Void>() {
                 @Override
@@ -213,7 +236,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                         }
                     } else {
                         finishVoidOperation(operationName, message, expectedResponseClass,
-                                propagateException, returnFuture);
+                                propagateException, returnFuture, callback);
                     }
                 }
             }, actorContext.getActorSystem().dispatcher());
@@ -223,11 +246,14 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     }
 
     private void finishVoidOperation(final String operationName, final Object message,
-            final Class<?> expectedResponseClass, final boolean propagateException,
-            final SettableFuture<Void> returnFuture) {
+                                     final Class<?> expectedResponseClass, final boolean propagateException,
+                                     final SettableFuture<Void> returnFuture, final OperationCallback callback) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {} finish {}", transactionId, operationName);
         }
+
+        callback.run();
+
         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
 
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@@ -247,6 +273,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
 
                 if(exceptionToPropagate != null) {
+
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
                             operationName, exceptionToPropagate);
@@ -265,11 +292,16 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                         }
                         returnFuture.set(null);
                     }
+
+                    callback.failure();
                 } else {
+
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
                     }
                     returnFuture.set(null);
+
+                    callback.success();
                 }
             }
         }, actorContext.getActorSystem().dispatcher());
@@ -279,4 +311,58 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     List<Future<ActorSelection>> getCohortFutures() {
         return Collections.unmodifiableList(cohortFutures);
     }
+
+    private static interface OperationCallback {
+        void run();
+        void success();
+        void failure();
+    }
+
+    private static class CommitCallback implements OperationCallback{
+
+        private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
+        private static final String COMMIT = "commit";
+
+        private final Timer commitTimer;
+        private final ActorContext actorContext;
+        private Timer.Context timerContext;
+
+        CommitCallback(ActorContext actorContext){
+            this.actorContext = actorContext;
+            commitTimer = actorContext.getOperationTimer(COMMIT);
+        }
+
+        @Override
+        public void run() {
+            timerContext = commitTimer.time();
+        }
+
+        @Override
+        public void success() {
+            timerContext.stop();
+
+            Snapshot timerSnapshot = commitTimer.getSnapshot();
+            double allowedLatencyInNanos = timerSnapshot.get98thPercentile();
+
+            long commitTimeoutInSeconds = actorContext.getDatastoreContext()
+                    .getShardTransactionCommitTimeoutInSeconds();
+            long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+            // Here we are trying to find out how many transactions per second are allowed
+            double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
+
+            LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
+                    actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
+
+            actorContext.setTxCreationLimit(newRateLimit);
+        }
+
+        @Override
+        public void failure() {
+            // This would mean we couldn't get a transaction completed in 30 seconds which is
+            // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+            // not going to be useful - so we leave it as it is
+        }
+    }
+
 }
index 87959efe8ae2def5684e253f2e0840c7177db838..ee3a5cc82573d6e415a5bbcd080277673e4cb051 100644 (file)
@@ -104,11 +104,13 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        actorContext.acquireTxCreationPermit();
         return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        actorContext.acquireTxCreationPermit();
         return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
index c9fdf389311f73c70ca2e0f16dae8e86b7cc0a05..cb06c898fd1940743c19b1763f2b173ec3dacbfc 100644 (file)
@@ -18,9 +18,13 @@ import akka.actor.PoisonPill;
 import akka.dispatch.Mapper;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -54,11 +58,11 @@ import scala.concurrent.duration.FiniteDuration;
  * but should not be passed to actors especially remote actors
  */
 public class ActorContext {
-    private static final Logger
-        LOG = LoggerFactory.getLogger(ActorContext.class);
-
-    public static final String MAILBOX = "bounded-mailbox";
-
+    private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
+    private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+    private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
+    private static final String METRIC_RATE = "rate";
+    private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
                                                               new Mapper<Throwable, Throwable>() {
         @Override
@@ -74,17 +78,23 @@ public class ActorContext {
             return actualFailure;
         }
     };
+    public static final String MAILBOX = "bounded-mailbox";
 
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
     private final DatastoreContext datastoreContext;
-    private volatile SchemaContext schemaContext;
     private final FiniteDuration operationDuration;
     private final Timeout operationTimeout;
     private final String selfAddressHostPort;
+    private final RateLimiter txRateLimiter;
+    private final MetricRegistry metricRegistry = new MetricRegistry();
+    private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
+    private final Timeout transactionCommitOperationTimeout;
+
+    private volatile SchemaContext schemaContext;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -100,10 +110,13 @@ public class ActorContext {
         this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
+        this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
 
-        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
-                TimeUnit.SECONDS);
+        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
         operationTimeout = new Timeout(operationDuration);
+        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+                TimeUnit.SECONDS));
+
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -113,6 +126,7 @@ public class ActorContext {
         }
 
         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
+        jmxReporter.start();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -446,4 +460,59 @@ public class ActorContext {
     public int getTransactionOutstandingOperationLimit(){
         return transactionOutstandingOperationLimit;
     }
+
+    /**
+     * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
+     * us to create a timer for pretty much anything.
+     *
+     * @param operationName
+     * @return
+     */
+    public Timer getOperationTimer(String operationName){
+        final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+        return metricRegistry.timer(rate);
+    }
+
+    /**
+     * Get the type of the data store to which this ActorContext belongs
+     *
+     * @return
+     */
+    public String getDataStoreType() {
+        return datastoreContext.getDataStoreType();
+    }
+
+    /**
+     * Set the number of transaction creation permits that are to be allowed
+     *
+     * @param permitsPerSecond
+     */
+    public void setTxCreationLimit(double permitsPerSecond){
+        txRateLimiter.setRate(permitsPerSecond);
+    }
+
+    /**
+     * Get the current transaction creation rate limit
+     * @return
+     */
+    public double getTxCreationLimit(){
+        return txRateLimiter.getRate();
+    }
+
+    /**
+     * Try to acquire a transaction creation permit. Will block if no permits are available.
+     */
+    public void acquireTxCreationPermit(){
+        txRateLimiter.acquire();
+    }
+
+    /**
+     * Return the operation timeout to be used when committing transactions
+     * @return
+     */
+    public Timeout getTransactionCommitOperationTimeout(){
+        return transactionCommitOperationTimeout;
+    }
+
+
 }
index 711c6a37b5c8e9aff85dd91db8d5041be67e8d4c..7e8307465b9818d3c886098219065c9276b77e65 100644 (file)
@@ -41,7 +41,7 @@ public class DistributedConfigDataStoreProviderModule extends
         }
 
         DatastoreContext datastoreContext = DatastoreContext.newBuilder()
-                .dataStoreMXBeanType("DistributedConfigDatastore")
+                .dataStoreType("config")
                 .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
                         props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
                         props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
@@ -67,9 +67,10 @@ public class DistributedConfigDataStoreProviderModule extends
                 .shardIsolatedLeaderCheckIntervalInMillis(
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+                .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
                 .build();
 
-        return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+        return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
                 datastoreContext, bundleContext);
     }
 
index d9df06df1c852e873225b670df7e3fd352213ef9..0655468531a16fe7a9ad2032a5f56d6f948044e3 100644 (file)
@@ -41,7 +41,7 @@ public class DistributedOperationalDataStoreProviderModule extends
         }
 
         DatastoreContext datastoreContext = DatastoreContext.newBuilder()
-                .dataStoreMXBeanType("DistributedOperationalDatastore")
+                .dataStoreType("operational")
                 .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
                         props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
                         props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
@@ -67,10 +67,11 @@ public class DistributedOperationalDataStoreProviderModule extends
                 .shardIsolatedLeaderCheckIntervalInMillis(
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+                .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
                 .build();
 
-        return DistributedDataStoreFactory.createInstance("operational",
-                getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
+        return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
+                datastoreContext, bundleContext);
     }
 
     public void setBundleContext(BundleContext bundleContext) {
index 46cd50d0c158b6a7e316364a588d28f8364fffce..e2ee7373d0cfd3c4a68ec98a221fd1a121f4f13c 100644 (file)
@@ -180,6 +180,14 @@ module distributed-datastore-provider {
             description "The interval at which the leader of the shard will check if its majority
                         followers are active and term itself as isolated";
         }
+
+        leaf tx-creation-initial-rate-limit {
+            default 100;
+            type non-zero-uint32-type;
+            description "The initial number of transactions per second that are allowed before the data store
+                         should begin applying back pressure. This number is only used as an initial guidance,
+                         subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit";
+        }
     }
 
     // Augments the 'configuration' choice node under modules/module.
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java
new file mode 100644 (file)
index 0000000..3e89823
--- /dev/null
@@ -0,0 +1,37 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DatastoreContextTest {
+
+    private DatastoreContext.Builder builder;
+
+    @Before
+    public void setUp(){
+        builder = new DatastoreContext.Builder();
+    }
+
+    @Test
+    public void testDefaults(){
+        DatastoreContext build = builder.build();
+
+        assertEquals(DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT , build.getShardTransactionIdleTimeout());
+        assertEquals(DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, build.getOperationTimeoutInSeconds());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, build.getShardTransactionCommitTimeoutInSeconds());
+        assertEquals(DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE, build.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+        assertEquals(DatastoreContext.DEFAULT_SNAPSHOT_BATCH_COUNT, build.getShardRaftConfig().getSnapshotBatchCount());
+        assertEquals(DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getHeartBeatInterval().length());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY, build.getShardTransactionCommitQueueCapacity());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT, build.getShardInitializationTimeout());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout());
+        assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent());
+        assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader());
+        assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckInterval().length());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
+        assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
+    }
+
+}
\ No newline at end of file
index 9f5aded3521b7c72f9b50fff77a988263f1e039d..1ad2be7af17e3511952b31c334db1f9c43762b87 100644 (file)
@@ -790,8 +790,11 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
             ShardStrategyFactory.setConfiguration(config);
 
+            datastoreContextBuilder.dataStoreType(typeName);
+
             DatastoreContext datastoreContext = datastoreContextBuilder.build();
-            DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
+
+            DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
                     config, datastoreContext);
 
             SchemaContext schemaContext = SchemaContextHelper.full();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
new file mode 100644 (file)
index 0000000..66fa876
--- /dev/null
@@ -0,0 +1,60 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class DistributedDataStoreTest extends AbstractActorTest {
+
+    private SchemaContext schemaContext;
+
+    @Mock
+    private ActorContext actorContext;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        schemaContext = TestModel.createTestContext();
+
+        doReturn(schemaContext).when(actorContext).getSchemaContext();
+    }
+
+    @Test
+    public void testRateLimitingUsedInReadWriteTxCreation(){
+        DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+        distributedDataStore.newReadWriteTransaction();
+
+        verify(actorContext, times(1)).acquireTxCreationPermit();
+    }
+
+    @Test
+    public void testRateLimitingUsedInWriteOnlyTxCreation(){
+        DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+        distributedDataStore.newWriteOnlyTransaction();
+
+        verify(actorContext, times(1)).acquireTxCreationPermit();
+    }
+
+
+    @Test
+    public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+        DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+        distributedDataStore.newReadOnlyTransaction();
+        distributedDataStore.newReadOnlyTransaction();
+        distributedDataStore.newReadOnlyTransaction();
+
+        verify(actorContext, times(0)).acquireTxCreationPermit();
+    }
+
+}
\ No newline at end of file
index 8c56efd41325e70911dfa644746803b4a8aae61a..596761ddc8fa9e9d25b4c797c2f814f5a1c63a09 100644 (file)
@@ -1,5 +1,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
@@ -11,6 +16,13 @@ import akka.util.Timeout;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,20 +47,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class ShardManagerTest extends AbstractActorTest {
     private static int ID_COUNTER = 1;
 
@@ -73,8 +71,10 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newShardMgrProps() {
-        return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
-                DatastoreContext.newBuilder().build());
+
+        DatastoreContext.Builder builder = DatastoreContext.newBuilder();
+        builder.dataStoreType(shardMrgIDSuffix);
+        return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build());
     }
 
     @Test
@@ -351,10 +351,8 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRecoveryApplicable(){
         new JavaTestKit(getSystem()) {
             {
-                final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(true).build());
+                final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+                        DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build());
                 final TestActorRef<ShardManager> persistentShardManager =
                         TestActorRef.create(getSystem(), persistentProps);
 
@@ -362,10 +360,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
 
-                final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(false).build());
+                final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+                        DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build());
                 final TestActorRef<ShardManager> nonPersistentShardManager =
                         TestActorRef.create(getSystem(), nonPersistentProps);
 
@@ -386,7 +382,8 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+                        DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) {
                     @Override
                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
@@ -426,8 +423,8 @@ public class ShardManagerTest extends AbstractActorTest {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
         TestShardManager(String shardMrgIDSuffix) {
-            super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
-                    DatastoreContext.newBuilder().build());
+            super(new MockClusterWrapper(), new MockConfiguration(),
+                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build());
         }
 
         @Override
index 75c93dd5d2fd2de9521d7540da7169fe656c71e9..d2396e0524f340844ea5f7c0e35dfd6cad0b4806 100644 (file)
@@ -3,14 +3,20 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
@@ -43,11 +49,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Mock
     private ActorContext actorContext;
 
+    @Mock
+    private DatastoreContext datastoreContext;
+
+    @Mock
+    private Timer commitTimer;
+
+    @Mock
+    private Timer.Context commitTimerContext;
+
+    @Mock
+    private Snapshot commitSnapshot;
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
         doReturn(getSystem()).when(actorContext).getActorSystem();
+        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+        doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+        doReturn(commitTimerContext).when(commitTimer).time();
+        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+        doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+        doReturn(10.0).when(actorContext).getTxCreationLimit();
     }
 
     private Future<ActorSelection> newCohort() {
@@ -86,12 +111,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         }
 
         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
-                isA(requestType));
+                isA(requestType), any(Timeout.class));
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
         verify(actorContext, times(nCohorts)).executeOperationAsync(
-                any(ActorSelection.class), isA(requestType));
+                any(ActorSelection.class), isA(requestType), any(Timeout.class));
     }
 
     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
@@ -276,8 +301,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         try {
             propagateExecutionExceptionCause(proxy.commit());
         } finally {
+
+            verify(actorContext, never()).setTxCreationLimit(anyLong());
             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
         }
+
     }
 
     @Test
@@ -294,11 +322,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
                 new CommitTransactionReply(), new CommitTransactionReply());
 
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
         proxy.canCommit().get(5, TimeUnit.SECONDS);
         proxy.preCommit().get(5, TimeUnit.SECONDS);
         proxy.commit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+
+        // Verify that the creation limit was changed to 0.5 (based on setup)
+        verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
+    }
+
+    @Test
+    public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verify(actorContext, never()).setTxCreationLimit(anyLong());
     }
 }
index dd37371a4510c622b4ee0fdb3b5ab5cf67b5bf7f..23c3a82a38255e9bfff2708583c4370ccb6ccf9a 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@ -29,10 +32,17 @@ public class TransactionChainProxyTest extends AbstractActorTest{
     ActorContext actorContext = null;
     SchemaContext schemaContext = mock(SchemaContext.class);
 
+    @Mock
+    ActorContext mockActorContext;
+
     @Before
     public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
         actorContext = new MockActorContext(getSystem());
         actorContext.setSchemaContext(schemaContext);
+
+        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
     }
 
     @SuppressWarnings("resource")
@@ -76,4 +86,32 @@ public class TransactionChainProxyTest extends AbstractActorTest{
 
         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
     }
+
+    @Test
+    public void testRateLimitingUsedInReadWriteTxCreation(){
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        txChainProxy.newReadWriteTransaction();
+
+        verify(mockActorContext, times(1)).acquireTxCreationPermit();
+    }
+
+    @Test
+    public void testRateLimitingUsedInWriteOnlyTxCreation(){
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        txChainProxy.newWriteOnlyTransaction();
+
+        verify(mockActorContext, times(1)).acquireTxCreationPermit();
+    }
+
+
+    @Test
+    public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        txChainProxy.newReadOnlyTransaction();
+
+        verify(mockActorContext, times(0)).acquireTxCreationPermit();
+    }
 }
index e4ab969f5c4351c0e5b3894d3d3115aa6322337a..eae46da2eee53bd4b2cf5ee7d2cb823e0111b6be 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -12,10 +13,12 @@ import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.time.StopWatch;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
@@ -265,4 +268,35 @@ public class ActorContextTest extends AbstractActorTest{
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testRateLimiting(){
+        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+        doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), mockDataStoreContext);
+
+        // Check that the initial value is being picked up from DataStoreContext
+        assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+
+        actorContext.setTxCreationLimit(1.0);
+
+        assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
+
+
+        StopWatch watch = new StopWatch();
+
+        watch.start();
+
+        actorContext.acquireTxCreationPermit();
+        actorContext.acquireTxCreationPermit();
+        actorContext.acquireTxCreationPermit();
+
+        watch.stop();
+
+        assertTrue("did not take as much time as expected", watch.getTime() > 1000);
+    }
 }