<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-impl</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
</dependencies>
<build>
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.text.WordUtils;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
*/
public class DatastoreContext {
- private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
- private final Duration shardTransactionIdleTimeout;
- private final int operationTimeoutInSeconds;
- private final String dataStoreMXBeanType;
- private final ConfigParams shardRaftConfig;
- private final int shardTransactionCommitTimeoutInSeconds;
- private final int shardTransactionCommitQueueCapacity;
- private final Timeout shardInitializationTimeout;
- private final Timeout shardLeaderElectionTimeout;
- private final boolean persistent;
- private final ConfigurationReader configurationReader;
- private final long shardElectionTimeoutFactor;
-
- private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
- ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
- Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
- int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout,
- Timeout shardLeaderElectionTimeout,
- boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor) {
- this.dataStoreProperties = dataStoreProperties;
- this.shardRaftConfig = shardRaftConfig;
- this.dataStoreMXBeanType = dataStoreMXBeanType;
- this.operationTimeoutInSeconds = operationTimeoutInSeconds;
- this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
- this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
- this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
- this.shardInitializationTimeout = shardInitializationTimeout;
- this.shardLeaderElectionTimeout = shardLeaderElectionTimeout;
- this.persistent = persistent;
- this.configurationReader = configurationReader;
- this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+ public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
+ public static final int DEFAULT_OPERATION_TIMEOUT_IN_SECONDS = 5;
+ public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
+ public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000;
+ public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
+ public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
+ public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
+ public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000;
+ public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
+ public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
+ public static final boolean DEFAULT_PERSISTENT = true;
+ public static final FileConfigurationReader DEFAULT_CONFIGURATION_READER = new FileConfigurationReader();
+ public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
+ public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
+ public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
+ public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+
+ private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+ private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+ private int operationTimeoutInSeconds = DEFAULT_OPERATION_TIMEOUT_IN_SECONDS;
+ private String dataStoreMXBeanType;
+ private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
+ private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
+ private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
+ private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+ private boolean persistent = DEFAULT_PERSISTENT;
+ private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
+ private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
+ private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+ private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+
+ private DatastoreContext(){
+ setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
+ setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
+ setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
+ setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
+ setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
}
public static Builder newBuilder() {
}
public ConfigParams getShardRaftConfig() {
- return shardRaftConfig;
+ return raftConfig;
}
public int getShardTransactionCommitTimeoutInSeconds() {
}
public long getShardElectionTimeoutFactor(){
- return this.shardElectionTimeoutFactor;
+ return raftConfig.getElectionTimeoutFactor();
+ }
+
+ public String getDataStoreType(){
+ return dataStoreType;
+ }
+
+ public long getTransactionCreationInitialRateLimit() {
+ return transactionCreationInitialRateLimit;
+ }
+
+ private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
+ raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
+ TimeUnit.MILLISECONDS));
+ }
+
+ private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){
+ raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
+ }
+
+
+ private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
+ raftConfig.setIsolatedLeaderCheckInterval(
+ new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
+ }
+
+ private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+ raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
+ }
+
+ private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
+ }
+
+ private void setSnapshotBatchCount(int shardSnapshotBatchCount) {
+ raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
public static class Builder {
- private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
- private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
- private int operationTimeoutInSeconds = 5;
- private String dataStoreMXBeanType;
- private int shardTransactionCommitTimeoutInSeconds = 30;
- private int shardJournalRecoveryLogBatchSize = 1000;
- private int shardSnapshotBatchCount = 20000;
- private int shardHeartbeatIntervalInMillis = 500;
- private int shardTransactionCommitQueueCapacity = 20000;
- private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES);
- private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
- private boolean persistent = true;
- private ConfigurationReader configurationReader = new FileConfigurationReader();
- private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
- private int shardSnapshotDataThresholdPercentage = 12;
- private long shardElectionTimeoutFactor = 2;
+ private DatastoreContext datastoreContext = new DatastoreContext();
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
- this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+ datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
- this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+ datastoreContext.operationTimeoutInSeconds = operationTimeoutInSeconds;
return this;
}
public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
- this.dataStoreMXBeanType = dataStoreMXBeanType;
+ datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
return this;
}
public Builder dataStoreProperties(InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
- this.dataStoreProperties = dataStoreProperties;
+ datastoreContext.dataStoreProperties = dataStoreProperties;
return this;
}
public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
- this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
+ datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
return this;
}
public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
- this.shardJournalRecoveryLogBatchSize = shardJournalRecoveryLogBatchSize;
+ datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
return this;
}
public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
- this.shardSnapshotBatchCount = shardSnapshotBatchCount;
+ datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
return this;
}
public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
- this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+ datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
return this;
}
-
public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
- this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
+ datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
return this;
}
public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
- this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
+ datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
return this;
}
public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
- this.shardInitializationTimeout = new Timeout(timeout, unit);
+ datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
return this;
}
public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
- this.shardLeaderElectionTimeout = new Timeout(timeout, unit);
+ datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
return this;
}
public Builder configurationReader(ConfigurationReader configurationReader){
- this.configurationReader = configurationReader;
+ datastoreContext.configurationReader = configurationReader;
return this;
}
public Builder persistent(boolean persistent){
- this.persistent = persistent;
+ datastoreContext.persistent = persistent;
return this;
}
public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
- this.shardIsolatedLeaderCheckIntervalInMillis = shardIsolatedLeaderCheckIntervalInMillis;
+ datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
return this;
}
public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){
- this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+ datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
return this;
}
+ public Builder transactionCreationInitialRateLimit(long initialRateLimit){
+ datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
+ return this;
+ }
- public DatastoreContext build() {
- DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
- raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
- TimeUnit.MILLISECONDS));
- raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
- raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
- raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
- raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
- raftConfig.setIsolatedLeaderCheckInterval(
- new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
+ public Builder dataStoreType(String dataStoreType){
+ datastoreContext.dataStoreType = dataStoreType;
+ datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore";
+ return this;
+ }
- return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
- operationTimeoutInSeconds, shardTransactionIdleTimeout,
- shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity,
- shardInitializationTimeout, shardLeaderElectionTimeout,
- persistent, configurationReader, shardElectionTimeoutFactor);
+ public DatastoreContext build() {
+ return datastoreContext;
}
}
}
private final ActorContext actorContext;
- public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
+ public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
Configuration configuration, DatastoreContext datastoreContext) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
- Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
+ String type = datastoreContext.getDataStoreType();
+
String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
LOG.info("Creating ShardManager : {}", shardManagerId);
actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
- ShardManager.props(type, cluster, configuration, datastoreContext)
+ ShardManager.props(cluster, configuration, datastoreContext)
.withMailbox(ActorContext.MAILBOX), shardManagerId ),
cluster, configuration, datastoreContext);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ actorContext.acquireTxCreationPermit();
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ actorContext.acquireTxCreationPermit();
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
}
private static volatile ActorSystem persistentActorSystem = null;
- public static DistributedDataStore createInstance(String name, SchemaService schemaService,
+ public static DistributedDataStore createInstance(SchemaService schemaService,
DatastoreContext datastoreContext, BundleContext bundleContext) {
ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
- new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+ new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem),
config, datastoreContext);
ShardStrategyFactory.setConfiguration(config);
private final DataPersistenceProvider dataPersistenceProvider;
/**
- * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
- * configuration or operational
*/
- protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+ protected ShardManager(ClusterWrapper cluster, Configuration configuration,
DatastoreContext datastoreContext) {
- this.type = Preconditions.checkNotNull(type, "type should not be null");
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
this.datastoreContext = datastoreContext;
this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
+ this.type = datastoreContext.getDataStoreType();
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
}
- public static Props props(final String type,
+ public static Props props(
final ClusterWrapper cluster,
final Configuration configuration,
final DatastoreContext datastoreContext) {
- Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
- return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
+ return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
}
@Override
private static class ShardManagerCreator implements Creator<ShardManager> {
private static final long serialVersionUID = 1L;
- final String type;
final ClusterWrapper cluster;
final Configuration configuration;
final DatastoreContext datastoreContext;
- ShardManagerCreator(String type, ClusterWrapper cluster,
+ ShardManagerCreator(ClusterWrapper cluster,
Configuration configuration, DatastoreContext datastoreContext) {
- this.type = type;
this.cluster = cluster;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration, datastoreContext);
+ return new ShardManager(cluster, configuration, datastoreContext);
}
}
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
+ private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+ @Override
+ public void run() {
+ }
+
+ @Override
+ public void success() {
+ }
+
+ @Override
+ public void failure() {
+ }
+ };
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
}
-
- futureList.add(actorContext.executeOperationAsync(cohort, message));
+ futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
}
return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
@Override
public ListenableFuture<Void> commit() {
- return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
- CommitTransactionReply.SERIALIZABLE_CLASS, true);
+ OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
+ new CommitCallback(actorContext);
+
+ return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
+ CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
+ }
+
+ private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException) {
+ return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
}
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException) {
+ final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} {}", transactionId, operationName);
if(cohorts != null) {
finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
- returnFuture);
+ returnFuture, callback);
} else {
buildCohortList().onComplete(new OnComplete<Void>() {
@Override
}
} else {
finishVoidOperation(operationName, message, expectedResponseClass,
- propagateException, returnFuture);
+ propagateException, returnFuture, callback);
}
}
}, actorContext.getActorSystem().dispatcher());
}
private void finishVoidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException,
- final SettableFuture<Void> returnFuture) {
+ final Class<?> expectedResponseClass, final boolean propagateException,
+ final SettableFuture<Void> returnFuture, final OperationCallback callback) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} finish {}", transactionId, operationName);
}
+
+ callback.run();
+
Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
}
if(exceptionToPropagate != null) {
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
operationName, exceptionToPropagate);
}
returnFuture.set(null);
}
+
+ callback.failure();
} else {
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
}
returnFuture.set(null);
+
+ callback.success();
}
}
}, actorContext.getActorSystem().dispatcher());
List<Future<ActorSelection>> getCohortFutures() {
return Collections.unmodifiableList(cohortFutures);
}
+
+ private static interface OperationCallback {
+ void run();
+ void success();
+ void failure();
+ }
+
+ private static class CommitCallback implements OperationCallback{
+
+ private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
+ private static final String COMMIT = "commit";
+
+ private final Timer commitTimer;
+ private final ActorContext actorContext;
+ private Timer.Context timerContext;
+
+ CommitCallback(ActorContext actorContext){
+ this.actorContext = actorContext;
+ commitTimer = actorContext.getOperationTimer(COMMIT);
+ }
+
+ @Override
+ public void run() {
+ timerContext = commitTimer.time();
+ }
+
+ @Override
+ public void success() {
+ timerContext.stop();
+
+ Snapshot timerSnapshot = commitTimer.getSnapshot();
+ double allowedLatencyInNanos = timerSnapshot.get98thPercentile();
+
+ long commitTimeoutInSeconds = actorContext.getDatastoreContext()
+ .getShardTransactionCommitTimeoutInSeconds();
+ long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+ // Here we are trying to find out how many transactions per second are allowed
+ double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
+
+ LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
+ actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
+
+ actorContext.setTxCreationLimit(newRateLimit);
+ }
+
+ @Override
+ public void failure() {
+ // This would mean we couldn't get a transaction completed in 30 seconds which is
+ // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+ // not going to be useful - so we leave it as it is
+ }
+ }
+
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ actorContext.acquireTxCreationPermit();
return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ actorContext.acquireTxCreationPermit();
return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
}
import akka.dispatch.Mapper;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
* but should not be passed to actors especially remote actors
*/
public class ActorContext {
- private static final Logger
- LOG = LoggerFactory.getLogger(ActorContext.class);
-
- public static final String MAILBOX = "bounded-mailbox";
-
+ private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
+ private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+ private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
+ private static final String METRIC_RATE = "rate";
+ private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
new Mapper<Throwable, Throwable>() {
@Override
return actualFailure;
}
};
+ public static final String MAILBOX = "bounded-mailbox";
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
private final DatastoreContext datastoreContext;
- private volatile SchemaContext schemaContext;
private final FiniteDuration operationDuration;
private final Timeout operationTimeout;
private final String selfAddressHostPort;
+ private final RateLimiter txRateLimiter;
+ private final MetricRegistry metricRegistry = new MetricRegistry();
+ private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
+ private final Timeout transactionCommitOperationTimeout;
+
+ private volatile SchemaContext schemaContext;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
+ this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
- TimeUnit.SECONDS);
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
operationTimeout = new Timeout(operationDuration);
+ transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+ TimeUnit.SECONDS));
+
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
}
transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
+ jmxReporter.start();
}
public DatastoreContext getDatastoreContext() {
public int getTransactionOutstandingOperationLimit(){
return transactionOutstandingOperationLimit;
}
+
+ /**
+ * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
+ * us to create a timer for pretty much anything.
+ *
+ * @param operationName
+ * @return
+ */
+ public Timer getOperationTimer(String operationName){
+ final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+ return metricRegistry.timer(rate);
+ }
+
+ /**
+ * Get the type of the data store to which this ActorContext belongs
+ *
+ * @return
+ */
+ public String getDataStoreType() {
+ return datastoreContext.getDataStoreType();
+ }
+
+ /**
+ * Set the number of transaction creation permits that are to be allowed
+ *
+ * @param permitsPerSecond
+ */
+ public void setTxCreationLimit(double permitsPerSecond){
+ txRateLimiter.setRate(permitsPerSecond);
+ }
+
+ /**
+ * Get the current transaction creation rate limit
+ * @return
+ */
+ public double getTxCreationLimit(){
+ return txRateLimiter.getRate();
+ }
+
+ /**
+ * Try to acquire a transaction creation permit. Will block if no permits are available.
+ */
+ public void acquireTxCreationPermit(){
+ txRateLimiter.acquire();
+ }
+
+ /**
+ * Return the operation timeout to be used when committing transactions
+ * @return
+ */
+ public Timeout getTransactionCommitOperationTimeout(){
+ return transactionCommitOperationTimeout;
+ }
+
+
}
}
DatastoreContext datastoreContext = DatastoreContext.newBuilder()
- .dataStoreMXBeanType("DistributedConfigDatastore")
+ .dataStoreType("config")
.dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
.shardIsolatedLeaderCheckIntervalInMillis(
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+ .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
.build();
- return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+ return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
datastoreContext, bundleContext);
}
}
DatastoreContext datastoreContext = DatastoreContext.newBuilder()
- .dataStoreMXBeanType("DistributedOperationalDatastore")
+ .dataStoreType("operational")
.dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
.shardIsolatedLeaderCheckIntervalInMillis(
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+ .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
.build();
- return DistributedDataStoreFactory.createInstance("operational",
- getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
+ return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
+ datastoreContext, bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
description "The interval at which the leader of the shard will check if its majority
followers are active and term itself as isolated";
}
+
+ leaf tx-creation-initial-rate-limit {
+ default 100;
+ type non-zero-uint32-type;
+ description "The initial number of transactions per second that are allowed before the data store
+ should begin applying back pressure. This number is only used as an initial guidance,
+ subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit";
+ }
}
// Augments the 'configuration' choice node under modules/module.
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DatastoreContextTest {
+
+ private DatastoreContext.Builder builder;
+
+ @Before
+ public void setUp(){
+ builder = new DatastoreContext.Builder();
+ }
+
+ @Test
+ public void testDefaults(){
+ DatastoreContext build = builder.build();
+
+ assertEquals(DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT , build.getShardTransactionIdleTimeout());
+ assertEquals(DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, build.getOperationTimeoutInSeconds());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, build.getShardTransactionCommitTimeoutInSeconds());
+ assertEquals(DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE, build.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+ assertEquals(DatastoreContext.DEFAULT_SNAPSHOT_BATCH_COUNT, build.getShardRaftConfig().getSnapshotBatchCount());
+ assertEquals(DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getHeartBeatInterval().length());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY, build.getShardTransactionCommitQueueCapacity());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT, build.getShardInitializationTimeout());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout());
+ assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent());
+ assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader());
+ assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckInterval().length());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
+ assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
+ }
+
+}
\ No newline at end of file
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
ShardStrategyFactory.setConfiguration(config);
+ datastoreContextBuilder.dataStoreType(typeName);
+
DatastoreContext datastoreContext = datastoreContextBuilder.build();
- DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
+
+ DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
config, datastoreContext);
SchemaContext schemaContext = SchemaContextHelper.full();
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class DistributedDataStoreTest extends AbstractActorTest {
+
+ private SchemaContext schemaContext;
+
+ @Mock
+ private ActorContext actorContext;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ schemaContext = TestModel.createTestContext();
+
+ doReturn(schemaContext).when(actorContext).getSchemaContext();
+ }
+
+ @Test
+ public void testRateLimitingUsedInReadWriteTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newReadWriteTransaction();
+
+ verify(actorContext, times(1)).acquireTxCreationPermit();
+ }
+
+ @Test
+ public void testRateLimitingUsedInWriteOnlyTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newWriteOnlyTransaction();
+
+ verify(actorContext, times(1)).acquireTxCreationPermit();
+ }
+
+
+ @Test
+ public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newReadOnlyTransaction();
+ distributedDataStore.newReadOnlyTransaction();
+ distributedDataStore.newReadOnlyTransaction();
+
+ verify(actorContext, times(0)).acquireTxCreationPermit();
+ }
+
+}
\ No newline at end of file
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class ShardManagerTest extends AbstractActorTest {
private static int ID_COUNTER = 1;
}
private Props newShardMgrProps() {
- return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().build());
+
+ DatastoreContext.Builder builder = DatastoreContext.newBuilder();
+ builder.dataStoreType(shardMrgIDSuffix);
+ return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build());
}
@Test
public void testRecoveryApplicable(){
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build());
+ final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build());
final TestActorRef<ShardManager> persistentShardManager =
TestActorRef.create(getSystem(), persistentProps);
assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
- final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(false).build());
+ final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build());
final TestActorRef<ShardManager> nonPersistentShardManager =
TestActorRef.create(getSystem(), nonPersistentProps);
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+ return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) {
@Override
protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
DataPersistenceProviderMonitor dataPersistenceProviderMonitor
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
TestShardManager(String shardMrgIDSuffix) {
- super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().build());
+ super(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build());
}
@Override
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
@Mock
private ActorContext actorContext;
+ @Mock
+ private DatastoreContext datastoreContext;
+
+ @Mock
+ private Timer commitTimer;
+
+ @Mock
+ private Timer.Context commitTimerContext;
+
+ @Mock
+ private Snapshot commitSnapshot;
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+ doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+ doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+ doReturn(commitTimerContext).when(commitTimer).time();
+ doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+ doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+ doReturn(10.0).when(actorContext).getTxCreationLimit();
}
private Future<ActorSelection> newCohort() {
}
stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
- isA(requestType));
+ isA(requestType), any(Timeout.class));
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
verify(actorContext, times(nCohorts)).executeOperationAsync(
- any(ActorSelection.class), isA(requestType));
+ any(ActorSelection.class), isA(requestType), any(Timeout.class));
}
private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
try {
propagateExecutionExceptionCause(proxy.commit());
} finally {
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
}
+
}
@Test
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
new CommitTransactionReply(), new CommitTransactionReply());
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
proxy.canCommit().get(5, TimeUnit.SECONDS);
proxy.preCommit().get(5, TimeUnit.SECONDS);
proxy.commit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+
+ // Verify that the creation limit was changed to 0.5 (based on setup)
+ verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
+ }
+
+ @Test
+ public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
+ proxy.commit().get(5, TimeUnit.SECONDS);
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
}
}
package org.opendaylight.controller.cluster.datastore;
import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
ActorContext actorContext = null;
SchemaContext schemaContext = mock(SchemaContext.class);
+ @Mock
+ ActorContext mockActorContext;
+
@Before
public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
actorContext = new MockActorContext(getSystem());
actorContext.setSchemaContext(schemaContext);
+
+ doReturn(schemaContext).when(mockActorContext).getSchemaContext();
}
@SuppressWarnings("resource")
Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
}
+
+ @Test
+ public void testRateLimitingUsedInReadWriteTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newReadWriteTransaction();
+
+ verify(mockActorContext, times(1)).acquireTxCreationPermit();
+ }
+
+ @Test
+ public void testRateLimitingUsedInWriteOnlyTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newWriteOnlyTransaction();
+
+ verify(mockActorContext, times(1)).acquireTxCreationPermit();
+ }
+
+
+ @Test
+ public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newReadOnlyTransaction();
+
+ verify(mockActorContext, times(0)).acquireTxCreationPermit();
+ }
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.time.StopWatch;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
assertEquals(expected, actual);
}
+ @Test
+ public void testRateLimiting(){
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext);
+
+ // Check that the initial value is being picked up from DataStoreContext
+ assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+
+ actorContext.setTxCreationLimit(1.0);
+
+ assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
+
+
+ StopWatch watch = new StopWatch();
+
+ watch.start();
+
+ actorContext.acquireTxCreationPermit();
+ actorContext.acquireTxCreationPermit();
+ actorContext.acquireTxCreationPermit();
+
+ watch.stop();
+
+ assertTrue("did not take as much time as expected", watch.getTime() > 1000);
+ }
}