import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.text.WordUtils;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
*
* @author Thomas Pantelis
*/
-public class DatastoreContext {
+// Noo-final for mocking
+public class DatastoreContext implements ClientActorConfig {
public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
- public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
+ public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
- private static final Set<String> GLOBAL_DATASTORE_NAMES = Sets.newConcurrentHashSet();
+ public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
+
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
+
+ private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
+
+ private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
private boolean persistent = DEFAULT_PERSISTENT;
private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
- private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
+ private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private boolean writeOnlyTransactionOptimizationsEnabled = true;
private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
+ private boolean useTellBasedProtocol = false;
private boolean transactionDebugContextEnabled = false;
private String shardManagerPersistenceId;
+ private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
+ private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
+ private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
+ private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
public static Set<String> getGlobalDatastoreNames() {
return GLOBAL_DATASTORE_NAMES;
}
- private DatastoreContext() {
+ DatastoreContext() {
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
- setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
+ setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
+ setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
}
- private DatastoreContext(DatastoreContext other) {
+ private DatastoreContext(final DatastoreContext other) {
this.dataStoreProperties = other.dataStoreProperties;
this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
this.operationTimeoutInMillis = other.operationTimeoutInMillis;
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
this.dataStoreName = other.dataStoreName;
this.logicalStoreType = other.logicalStoreType;
+ this.storeRoot = other.storeRoot;
this.shardBatchedModificationCount = other.shardBatchedModificationCount;
this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
this.shardManagerPersistenceId = other.shardManagerPersistenceId;
+ this.useTellBasedProtocol = other.useTellBasedProtocol;
+ this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
+ this.requestTimeout = other.requestTimeout;
+ this.noProgressTimeout = other.noProgressTimeout;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
+ setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
+ setTempFileDirectory(other.getTempFileDirectory());
+ setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
+ setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
}
public static Builder newBuilder() {
return new Builder(new DatastoreContext());
}
- public static Builder newBuilderFrom(DatastoreContext context) {
+ public static Builder newBuilderFrom(final DatastoreContext context) {
return new Builder(new DatastoreContext(context));
}
return logicalStoreType;
}
+ public YangInstanceIdentifier getStoreRoot() {
+ return storeRoot;
+ }
+
public long getTransactionCreationInitialRateLimit() {
return transactionCreationInitialRateLimit;
}
return shardManagerPersistenceId;
}
- private void setPeerAddressResolver(PeerAddressResolver resolver) {
+ @Override
+ public String getTempFileDirectory() {
+ return raftConfig.getTempFileDirectory();
+ }
+
+ private void setTempFileDirectory(final String tempFileDirectory) {
+ raftConfig.setTempFileDirectory(tempFileDirectory);
+ }
+
+ @Override
+ public int getFileBackedStreamingThreshold() {
+ return raftConfig.getFileBackedStreamingThreshold();
+ }
+
+ private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
+ raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
+ }
+
+ private void setPeerAddressResolver(final PeerAddressResolver resolver) {
raftConfig.setPeerAddressResolver(resolver);
}
- private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis) {
+ private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
TimeUnit.MILLISECONDS));
}
- private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
+ private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
}
- private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
+ private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
raftConfig.setIsolatedLeaderCheckInterval(
new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
}
- private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+ private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
}
- private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) {
+ private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
}
- private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
&& shardSnapshotDataThresholdPercentage <= 100);
raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
}
- private void setSnapshotBatchCount(long shardSnapshotBatchCount) {
+ private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
- private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) {
- raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
+ @Deprecated
+ private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
+ // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
+ // maximumMessageSliceSize.
+ if (shardSnapshotChunkSize < maximumMessageSliceSize) {
+ raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
+ }
+ }
+
+ private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
+ raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
+ this.maximumMessageSliceSize = maximumMessageSliceSize;
+ }
+
+ private void setSyncIndexThreshold(final long syncIndexThreshold) {
+ raftConfig.setSyncIndexThreshold(syncIndexThreshold);
}
public int getShardBatchedModificationCount() {
return transactionDebugContextEnabled;
}
- public int getShardSnapshotChunkSize() {
- return raftConfig.getSnapshotChunkSize();
+ public boolean isUseTellBasedProtocol() {
+ return useTellBasedProtocol;
}
- public static class Builder {
+ @Override
+ public int getMaximumMessageSliceSize() {
+ return maximumMessageSliceSize;
+ }
+
+ @Override
+ public long getBackendAlivenessTimerInterval() {
+ return backendAlivenessTimerInterval;
+ }
+
+ @Override
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ @Override
+ public long getNoProgressTimeout() {
+ return noProgressTimeout;
+ }
+
+ public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
private int maxShardDataStoreExecutorQueueSize =
InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
- private Builder(DatastoreContext datastoreContext) {
+ Builder(final DatastoreContext datastoreContext) {
this.datastoreContext = datastoreContext;
if (datastoreContext.getDataStoreProperties() != null) {
}
}
- public Builder boundedMailboxCapacity(int boundedMailboxCapacity) {
+ public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
// TODO - this is defined in the yang DataStoreProperties but not currently used.
return this;
}
- public Builder enableMetricCapture(boolean enableMetricCapture) {
+ public Builder enableMetricCapture(final boolean enableMetricCapture) {
// TODO - this is defined in the yang DataStoreProperties but not currently used.
return this;
}
- public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) {
+ public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
return this;
}
- public Builder shardTransactionIdleTimeoutInMinutes(long timeout) {
+ public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
}
- public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
+ public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
return this;
}
- public Builder operationTimeoutInMillis(long operationTimeoutInMillis) {
+ public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
return this;
}
- public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
+ public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
return this;
}
- public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
+ public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
return this;
}
- public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
+ public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
return this;
}
- public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
+ public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
return this;
}
- public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
return this;
}
- public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
+ public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
return this;
}
- public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
+ public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
return this;
}
- public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
+ public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
return this;
}
- public Builder shardInitializationTimeoutInSeconds(long timeout) {
+ public Builder shardInitializationTimeoutInSeconds(final long timeout) {
return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
}
- public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
+ public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
return this;
}
- public Builder shardLeaderElectionTimeoutInSeconds(long timeout) {
+ public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
}
- public Builder configurationReader(AkkaConfigurationReader configurationReader) {
+ public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
datastoreContext.configurationReader = configurationReader;
return this;
}
- public Builder persistent(boolean persistent) {
+ public Builder persistent(final boolean persistent) {
datastoreContext.persistent = persistent;
return this;
}
- public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
+ public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
return this;
}
- public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+ public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
return this;
}
- public Builder transactionCreationInitialRateLimit(long initialRateLimit) {
+ public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
return this;
}
- public Builder logicalStoreType(LogicalDatastoreType logicalStoreType) {
+ public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
// Retain compatible naming
return this;
}
- public Builder dataStoreName(String dataStoreName) {
+ public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
+ datastoreContext.storeRoot = storeRoot;
+ return this;
+ }
+
+ public Builder dataStoreName(final String dataStoreName) {
datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
return this;
}
- public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
+ public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
return this;
}
- public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
+ public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
return this;
}
- public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
+ public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
return this;
}
- public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
+ public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
value, TimeUnit.SECONDS);
return this;
}
- public Builder transactionDebugContextEnabled(boolean value) {
+ public Builder transactionDebugContextEnabled(final boolean value) {
datastoreContext.transactionDebugContextEnabled = value;
return this;
}
- public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
- this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
+ public Builder maxShardDataChangeExecutorPoolSize(final int newMaxShardDataChangeExecutorPoolSize) {
+ this.maxShardDataChangeExecutorPoolSize = newMaxShardDataChangeExecutorPoolSize;
return this;
}
- public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) {
- this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
+ public Builder maxShardDataChangeExecutorQueueSize(final int newMaxShardDataChangeExecutorQueueSize) {
+ this.maxShardDataChangeExecutorQueueSize = newMaxShardDataChangeExecutorQueueSize;
return this;
}
- public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) {
- this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
+ public Builder maxShardDataChangeListenerQueueSize(final int newMaxShardDataChangeListenerQueueSize) {
+ this.maxShardDataChangeListenerQueueSize = newMaxShardDataChangeListenerQueueSize;
return this;
}
- public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) {
- this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
+ public Builder maxShardDataStoreExecutorQueueSize(final int newMaxShardDataStoreExecutorQueueSize) {
+ this.maxShardDataStoreExecutorQueueSize = newMaxShardDataStoreExecutorQueueSize;
+ return this;
+ }
+
+ public Builder useTellBasedProtocol(final boolean value) {
+ datastoreContext.useTellBasedProtocol = value;
return this;
}
* For unit tests only.
*/
@VisibleForTesting
- public Builder shardManagerPersistenceId(String id) {
+ public Builder shardManagerPersistenceId(final String id) {
datastoreContext.shardManagerPersistenceId = id;
return this;
}
- public DatastoreContext build() {
- datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
- maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
- maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
+ public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
+ datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
+ return this;
+ }
- if (datastoreContext.dataStoreName != null) {
- GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
- }
+ @Deprecated
+ public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
+ LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
+ + "use maximum-message-slice-size instead");
+ datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
+ return this;
+ }
- return datastoreContext;
+ public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
+ datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
+ return this;
}
- public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) {
- datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
+ public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
+ datastoreContext.setPeerAddressResolver(resolver);
return this;
}
- public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) {
- datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
+ public Builder tempFileDirectory(final String tempFileDirectory) {
+ datastoreContext.setTempFileDirectory(tempFileDirectory);
return this;
}
- public Builder shardPeerAddressResolver(PeerAddressResolver resolver) {
- datastoreContext.setPeerAddressResolver(resolver);
+ public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
+ datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
+ return this;
+ }
+
+ public Builder syncIndexThreshold(final long syncIndexThreshold) {
+ datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
+ return this;
+ }
+
+ public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
+ datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
+ return this;
+ }
+
+ public Builder frontendRequestTimeoutInSeconds(final long timeout) {
+ datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
+ return this;
+ }
+
+ public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
+ datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
return this;
}
+
+ @Override
+ public DatastoreContext build() {
+ datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
+ maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
+ maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
+
+ if (datastoreContext.dataStoreName != null) {
+ GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
+ }
+
+ return datastoreContext;
+ }
}
}