import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
- public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000;
+ public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
- public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
+ public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
+ DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
- public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
+ public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
+ TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
- private static final Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
+ private static final Set<String> GLOBAL_DATASTORE_NAMES = Sets.newConcurrentHashSet();
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
+ private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private boolean writeOnlyTransactionOptimizationsEnabled = true;
private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
+ private boolean useTellBasedProtocol = false;
private boolean transactionDebugContextEnabled = false;
private String shardManagerPersistenceId;
- public static Set<String> getGlobalDatastoreTypes() {
- return globalDatastoreTypes;
+ public static Set<String> getGlobalDatastoreNames() {
+ return GLOBAL_DATASTORE_NAMES;
}
private DatastoreContext() {
setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
}
- private DatastoreContext(DatastoreContext other) {
+ private DatastoreContext(final DatastoreContext other) {
this.dataStoreProperties = other.dataStoreProperties;
this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
this.operationTimeoutInMillis = other.operationTimeoutInMillis;
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
this.dataStoreName = other.dataStoreName;
this.logicalStoreType = other.logicalStoreType;
+ this.storeRoot = other.storeRoot;
this.shardBatchedModificationCount = other.shardBatchedModificationCount;
this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
this.shardManagerPersistenceId = other.shardManagerPersistenceId;
+ this.useTellBasedProtocol = other.useTellBasedProtocol;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
+ setTempFileDirectory(other.getTempFileDirectory());
+ setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
}
public static Builder newBuilder() {
return configurationReader;
}
- public long getShardElectionTimeoutFactor(){
+ public long getShardElectionTimeoutFactor() {
return raftConfig.getElectionTimeoutFactor();
}
- public String getDataStoreName(){
+ public String getDataStoreName() {
return dataStoreName;
}
return logicalStoreType;
}
+ public YangInstanceIdentifier getStoreRoot() {
+ return storeRoot;
+ }
+
public long getTransactionCreationInitialRateLimit() {
return transactionCreationInitialRateLimit;
}
return shardManagerPersistenceId;
}
+ public String getTempFileDirectory() {
+ return raftConfig.getTempFileDirectory();
+ }
+
+ private void setTempFileDirectory(String tempFileDirectory) {
+ raftConfig.setTempFileDirectory(tempFileDirectory);
+ }
+
+ public int getFileBackedStreamingThreshold() {
+ return raftConfig.getFileBackedStreamingThreshold();
+ }
+
+ private void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+ raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
+ }
+
private void setPeerAddressResolver(PeerAddressResolver resolver) {
raftConfig.setPeerAddressResolver(resolver);
}
- private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
+ private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis) {
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
TimeUnit.MILLISECONDS));
}
- private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){
+ private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
}
}
private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
+ && shardSnapshotDataThresholdPercentage <= 100);
raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
}
return transactionDebugContextEnabled;
}
+ public boolean isUseTellBasedProtocol() {
+ return useTellBasedProtocol;
+ }
+
public int getShardSnapshotChunkSize() {
return raftConfig.getSnapshotChunkSize();
}
private Builder(DatastoreContext datastoreContext) {
this.datastoreContext = datastoreContext;
- if(datastoreContext.getDataStoreProperties() != null) {
+ if (datastoreContext.getDataStoreProperties() != null) {
maxShardDataChangeExecutorPoolSize =
datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
maxShardDataChangeExecutorQueueSize =
return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
}
- public Builder configurationReader(AkkaConfigurationReader configurationReader){
+ public Builder configurationReader(AkkaConfigurationReader configurationReader) {
datastoreContext.configurationReader = configurationReader;
return this;
}
- public Builder persistent(boolean persistent){
+ public Builder persistent(boolean persistent) {
datastoreContext.persistent = persistent;
return this;
}
return this;
}
- public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){
+ public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor) {
datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
return this;
}
- public Builder transactionCreationInitialRateLimit(long initialRateLimit){
+ public Builder transactionCreationInitialRateLimit(long initialRateLimit) {
datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
return this;
}
- /**
- * @deprecated Use {@link #logicalStoreType(LogicalDatastoreType)} or {@link #dataStoreName(String)}.
- */
- @Deprecated
- public Builder dataStoreType(String dataStoreType){
- return dataStoreName(dataStoreType);
- }
-
- public Builder logicalStoreType(LogicalDatastoreType logicalStoreType){
+ public Builder logicalStoreType(LogicalDatastoreType logicalStoreType) {
datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
// Retain compatible naming
switch (logicalStoreType) {
- case CONFIGURATION:
- dataStoreName("config");
- break;
- case OPERATIONAL:
- dataStoreName("operational");
- break;
- default:
- dataStoreName(logicalStoreType.name());
+ case CONFIGURATION:
+ dataStoreName("config");
+ break;
+ case OPERATIONAL:
+ dataStoreName("operational");
+ break;
+ default:
+ dataStoreName(logicalStoreType.name());
}
return this;
}
- public Builder dataStoreName(String dataStoreName){
+ public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
+ datastoreContext.storeRoot = storeRoot;
+ return this;
+ }
+
+ public Builder dataStoreName(String dataStoreName) {
datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
return this;
}
- public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
+ public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
return this;
}
return this;
}
+ public Builder useTellBasedProtocol(boolean value) {
+ datastoreContext.useTellBasedProtocol = value;
+ return this;
+ }
+
/**
* For unit tests only.
*/
maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
- if(datastoreContext.dataStoreName != null) {
- globalDatastoreTypes.add(datastoreContext.dataStoreName);
+ if (datastoreContext.dataStoreName != null) {
+ GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
}
return datastoreContext;
datastoreContext.setPeerAddressResolver(resolver);
return this;
}
+
+ public Builder tempFileDirectory(String tempFileDirectory) {
+ datastoreContext.setTempFileDirectory(tempFileDirectory);
+ return this;
+ }
+
+ public Builder fileBackedStreamingThresholdInMegabytes(int fileBackedStreamingThreshold) {
+ datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
+ return this;
+ }
}
}