package org.opendaylight.controller.cluster.datastore;
import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
- public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000;
+ public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
+ public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
- private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
+ private static final Set<String> globalDatastoreNames = Sets.newConcurrentHashSet();
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
- private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+ private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
+ private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private boolean writeOnlyTransactionOptimizationsEnabled = true;
private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
private boolean transactionDebugContextEnabled = false;
- private String customRaftPolicyImplementation = "";
+ private String shardManagerPersistenceId;
- public static Set<String> getGlobalDatastoreTypes() {
- return globalDatastoreTypes;
+ public static Set<String> getGlobalDatastoreNames() {
+ return globalDatastoreNames;
}
private DatastoreContext() {
setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
+ setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
}
private DatastoreContext(DatastoreContext other) {
this.persistent = other.persistent;
this.configurationReader = other.configurationReader;
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
- this.dataStoreType = other.dataStoreType;
+ this.dataStoreName = other.dataStoreName;
+ this.logicalStoreType = other.logicalStoreType;
this.shardBatchedModificationCount = other.shardBatchedModificationCount;
this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
- this.customRaftPolicyImplementation = other.customRaftPolicyImplementation;
+ this.shardManagerPersistenceId = other.shardManagerPersistenceId;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
- setCustomRaftPolicyImplementation(other.customRaftPolicyImplementation);
-
+ setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
+ setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
+ setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
}
public static Builder newBuilder() {
return raftConfig.getElectionTimeoutFactor();
}
- public String getDataStoreType(){
- return dataStoreType;
+ public String getDataStoreName(){
+ return dataStoreName;
+ }
+
+ public LogicalDatastoreType getLogicalStoreType() {
+ return logicalStoreType;
}
public long getTransactionCreationInitialRateLimit() {
return transactionCreationInitialRateLimit;
}
+ public String getShardManagerPersistenceId() {
+ return shardManagerPersistenceId;
+ }
+
+ private void setPeerAddressResolver(PeerAddressResolver resolver) {
+ raftConfig.setPeerAddressResolver(resolver);
+ }
+
private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
TimeUnit.MILLISECONDS));
raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
}
-
private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
+ && shardSnapshotDataThresholdPercentage <= 100);
raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
}
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
+ private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) {
+ raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
+ }
+
public int getShardBatchedModificationCount() {
return shardBatchedModificationCount;
}
return transactionDebugContextEnabled;
}
+ public int getShardSnapshotChunkSize() {
+ return raftConfig.getSnapshotChunkSize();
+ }
+
public static class Builder {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
return this;
}
- public Builder dataStoreType(String dataStoreType){
- datastoreContext.dataStoreType = dataStoreType;
- datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore";
+ public Builder logicalStoreType(LogicalDatastoreType logicalStoreType){
+ datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
+
+ // Retain compatible naming
+ switch (logicalStoreType) {
+ case CONFIGURATION:
+ dataStoreName("config");
+ break;
+ case OPERATIONAL:
+ dataStoreName("operational");
+ break;
+ default:
+ dataStoreName(logicalStoreType.name());
+ }
+
+ return this;
+ }
+
+ public Builder dataStoreName(String dataStoreName){
+ datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
+ datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
return this;
}
return this;
}
+ /**
+ * For unit tests only.
+ */
+ @VisibleForTesting
+ public Builder shardManagerPersistenceId(String id) {
+ datastoreContext.shardManagerPersistenceId = id;
+ return this;
+ }
+
public DatastoreContext build() {
datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
- if(datastoreContext.dataStoreType != null) {
- globalDatastoreTypes.add(datastoreContext.dataStoreType);
+ if(datastoreContext.dataStoreName != null) {
+ globalDatastoreNames.add(datastoreContext.dataStoreName);
}
return datastoreContext;
datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
return this;
}
+
+ public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) {
+ datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
+ return this;
+ }
+
+ public Builder shardPeerAddressResolver(PeerAddressResolver resolver) {
+ datastoreContext.setPeerAddressResolver(resolver);
+ return this;
+ }
}
}