X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDatastoreContext.java;h=dee8142bbc0904e6666784143d8f1cb795f27baa;hp=4f7d68ea52861ae923212d4c8aa13668ae00b473;hb=94362614be25e34e8427c02799daffb8cae29d94;hpb=769ef0f950f2ed6cfc14d274e6a8edc583a36a96 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 4f7d68ea52..dee8142bbc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -10,16 +10,23 @@ package org.opendaylight.controller.cluster.datastore; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; +import com.google.common.base.Preconditions; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.text.WordUtils; +import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.ClientActorConfig; import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.PeerAddressResolver; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -28,16 +35,17 @@ import scala.concurrent.duration.FiniteDuration; * * @author Thomas Pantelis */ -public class DatastoreContext { +public class DatastoreContext implements ClientActorConfig { public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore"; public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES); public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000; public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30; - public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000; + public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1; public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000; public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500; - public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10; + public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = + DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10; public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000; public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES); public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS); @@ -48,10 +56,17 @@ public class DatastoreContext { public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100; public static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000; - public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); - public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000; + public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = + TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); + public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB - private static Set globalDatastoreTypes = Sets.newConcurrentHashSet(); + public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10; + + private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class); + + private static final Set GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet(); + + private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); private InMemoryDOMDataStoreConfigProperties dataStoreProperties; private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; @@ -64,16 +79,22 @@ public class DatastoreContext { private boolean persistent = DEFAULT_PERSISTENT; private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; - private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); - private String dataStoreType = UNKNOWN_DATA_STORE_TYPE; + private String dataStoreName = UNKNOWN_DATA_STORE_TYPE; + private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL; + private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY; private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; private boolean writeOnlyTransactionOptimizationsEnabled = true; private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS; + private boolean useTellBasedProtocol = false; private boolean transactionDebugContextEnabled = false; private String shardManagerPersistenceId; + private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE; + private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS; + private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS; + private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS; - public static Set getGlobalDatastoreTypes() { - return globalDatastoreTypes; + public static Set getGlobalDatastoreNames() { + return GLOBAL_DATASTORE_NAMES; } private DatastoreContext() { @@ -83,10 +104,11 @@ public class DatastoreContext { setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS); setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE); setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR); - setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE); + setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD); + setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE); } - private DatastoreContext(DatastoreContext other) { + private DatastoreContext(final DatastoreContext other) { this.dataStoreProperties = other.dataStoreProperties; this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout; this.operationTimeoutInMillis = other.operationTimeoutInMillis; @@ -98,12 +120,18 @@ public class DatastoreContext { this.persistent = other.persistent; this.configurationReader = other.configurationReader; this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; - this.dataStoreType = other.dataStoreType; + this.dataStoreName = other.dataStoreName; + this.logicalStoreType = other.logicalStoreType; + this.storeRoot = other.storeRoot; this.shardBatchedModificationCount = other.shardBatchedModificationCount; this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis; this.transactionDebugContextEnabled = other.transactionDebugContextEnabled; - this.shardManagerPersistenceId = shardManagerPersistenceId; + this.shardManagerPersistenceId = other.shardManagerPersistenceId; + this.useTellBasedProtocol = other.useTellBasedProtocol; + this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval; + this.requestTimeout = other.requestTimeout; + this.noProgressTimeout = other.noProgressTimeout; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -112,15 +140,19 @@ public class DatastoreContext { setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage()); setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor()); setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass()); + setMaximumMessageSliceSize(other.getMaximumMessageSliceSize()); setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); setPeerAddressResolver(other.raftConfig.getPeerAddressResolver()); + setTempFileDirectory(other.getTempFileDirectory()); + setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold()); + setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold()); } public static Builder newBuilder() { return new Builder(new DatastoreContext()); } - public static Builder newBuilderFrom(DatastoreContext context) { + public static Builder newBuilderFrom(final DatastoreContext context) { return new Builder(new DatastoreContext(context)); } @@ -168,12 +200,20 @@ public class DatastoreContext { return configurationReader; } - public long getShardElectionTimeoutFactor(){ + public long getShardElectionTimeoutFactor() { return raftConfig.getElectionTimeoutFactor(); } - public String getDataStoreType(){ - return dataStoreType; + public String getDataStoreName() { + return dataStoreName; + } + + public LogicalDatastoreType getLogicalStoreType() { + return logicalStoreType; + } + + public YangInstanceIdentifier getStoreRoot() { + return storeRoot; } public long getTransactionCreationInitialRateLimit() { @@ -184,43 +224,77 @@ public class DatastoreContext { return shardManagerPersistenceId; } - private void setPeerAddressResolver(PeerAddressResolver resolver) { + @Override + public String getTempFileDirectory() { + return raftConfig.getTempFileDirectory(); + } + + private void setTempFileDirectory(final String tempFileDirectory) { + raftConfig.setTempFileDirectory(tempFileDirectory); + } + + @Override + public int getFileBackedStreamingThreshold() { + return raftConfig.getFileBackedStreamingThreshold(); + } + + private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) { + raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold); + } + + private void setPeerAddressResolver(final PeerAddressResolver resolver) { raftConfig.setPeerAddressResolver(resolver); } - private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){ + private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) { raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, TimeUnit.MILLISECONDS)); } - private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){ + private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) { raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); } - private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) { + private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) { raftConfig.setIsolatedLeaderCheckInterval( new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS)); } - private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) { + private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) { raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor); } - private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) { + private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) { raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation); } - private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) { + private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) { + Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0 + && shardSnapshotDataThresholdPercentage <= 100); raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); } - private void setSnapshotBatchCount(long shardSnapshotBatchCount) { + private void setSnapshotBatchCount(final long shardSnapshotBatchCount) { raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); } - private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) { - raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); + @Deprecated + private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) { + // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed + // maximumMessageSliceSize. + if (shardSnapshotChunkSize < maximumMessageSliceSize) { + raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); + } + } + + private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) { + raftConfig.setSnapshotChunkSize(maximumMessageSliceSize); + this.maximumMessageSliceSize = maximumMessageSliceSize; + } + + private void setSyncIndexThreshold(final long syncIndexThreshold) { + raftConfig.setSyncIndexThreshold(syncIndexThreshold); } public int getShardBatchedModificationCount() { @@ -239,11 +313,31 @@ public class DatastoreContext { return transactionDebugContextEnabled; } - public int getShardSnapshotChunkSize() { - return raftConfig.getSnapshotChunkSize(); + public boolean isUseTellBasedProtocol() { + return useTellBasedProtocol; + } + + @Override + public int getMaximumMessageSliceSize() { + return maximumMessageSliceSize; + } + + @Override + public long getBackendAlivenessTimerInterval() { + return backendAlivenessTimerInterval; } - public static class Builder { + @Override + public long getRequestTimeout() { + return requestTimeout; + } + + @Override + public long getNoProgressTimeout() { + return noProgressTimeout; + } + + public static class Builder implements org.opendaylight.yangtools.concepts.Builder { private final DatastoreContext datastoreContext; private int maxShardDataChangeExecutorPoolSize = InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE; @@ -254,10 +348,10 @@ public class DatastoreContext { private int maxShardDataStoreExecutorQueueSize = InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE; - private Builder(DatastoreContext datastoreContext) { + private Builder(final DatastoreContext datastoreContext) { this.datastoreContext = datastoreContext; - if(datastoreContext.getDataStoreProperties() != null) { + if (datastoreContext.getDataStoreProperties() != null) { maxShardDataChangeExecutorPoolSize = datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize(); maxShardDataChangeExecutorQueueSize = @@ -269,200 +363,267 @@ public class DatastoreContext { } } - public Builder boundedMailboxCapacity(int boundedMailboxCapacity) { + public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) { // TODO - this is defined in the yang DataStoreProperties but not currently used. return this; } - public Builder enableMetricCapture(boolean enableMetricCapture) { + public Builder enableMetricCapture(final boolean enableMetricCapture) { // TODO - this is defined in the yang DataStoreProperties but not currently used. return this; } - public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) { + public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) { datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit); return this; } - public Builder shardTransactionIdleTimeoutInMinutes(long timeout) { + public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) { return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES); } - public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) { + public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) { datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds); return this; } - public Builder operationTimeoutInMillis(long operationTimeoutInMillis) { + public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) { datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis; return this; } - public Builder dataStoreMXBeanType(String dataStoreMXBeanType) { + public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) { datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType; return this; } - public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) { + public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) { datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds; return this; } - public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) { + public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) { datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); return this; } - public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) { + public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) { datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount); return this; } - public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) { + public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) { datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); return this; } - public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) { + public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) { datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis); return this; } - public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) { + public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) { datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity; return this; } - public Builder shardInitializationTimeout(long timeout, TimeUnit unit) { + public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) { datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit); return this; } - public Builder shardInitializationTimeoutInSeconds(long timeout) { + public Builder shardInitializationTimeoutInSeconds(final long timeout) { return shardInitializationTimeout(timeout, TimeUnit.SECONDS); } - public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) { + public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) { datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit); return this; } - public Builder shardLeaderElectionTimeoutInSeconds(long timeout) { + public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) { return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS); } - public Builder configurationReader(AkkaConfigurationReader configurationReader){ + public Builder configurationReader(final AkkaConfigurationReader configurationReader) { datastoreContext.configurationReader = configurationReader; return this; } - public Builder persistent(boolean persistent){ + public Builder persistent(final boolean persistent) { datastoreContext.persistent = persistent; return this; } - public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) { + public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) { datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis); return this; } - public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){ + public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) { datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor); return this; } - public Builder transactionCreationInitialRateLimit(long initialRateLimit){ + public Builder transactionCreationInitialRateLimit(final long initialRateLimit) { datastoreContext.transactionCreationInitialRateLimit = initialRateLimit; return this; } - public Builder dataStoreType(String dataStoreType){ - datastoreContext.dataStoreType = dataStoreType; - datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore"; + public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) { + datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType); + + // Retain compatible naming + switch (logicalStoreType) { + case CONFIGURATION: + dataStoreName("config"); + break; + case OPERATIONAL: + dataStoreName("operational"); + break; + default: + dataStoreName(logicalStoreType.name()); + } + + return this; + } + + public Builder storeRoot(final YangInstanceIdentifier storeRoot) { + datastoreContext.storeRoot = storeRoot; return this; } - public Builder shardBatchedModificationCount(int shardBatchedModificationCount) { + public Builder dataStoreName(final String dataStoreName) { + datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName); + datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore"; + return this; + } + + public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) { datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount; return this; } - public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) { + public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) { datastoreContext.writeOnlyTransactionOptimizationsEnabled = value; return this; } - public Builder shardCommitQueueExpiryTimeoutInMillis(long value) { + public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) { datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value; return this; } - public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) { + public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) { datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert( value, TimeUnit.SECONDS); return this; } - public Builder transactionDebugContextEnabled(boolean value) { + public Builder transactionDebugContextEnabled(final boolean value) { datastoreContext.transactionDebugContextEnabled = value; return this; } - public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) { + public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) { this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize; return this; } - public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) { + public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) { this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize; return this; } - public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) { + public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) { this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize; return this; } - public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) { + public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) { this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize; return this; } + public Builder useTellBasedProtocol(final boolean value) { + datastoreContext.useTellBasedProtocol = value; + return this; + } + /** * For unit tests only. */ @VisibleForTesting - public Builder shardManagerPersistenceId(String id) { + public Builder shardManagerPersistenceId(final String id) { datastoreContext.shardManagerPersistenceId = id; return this; } - public DatastoreContext build() { - datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create( - maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize, - maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize); + public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) { + datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation); + return this; + } - if(datastoreContext.dataStoreType != null) { - globalDatastoreTypes.add(datastoreContext.dataStoreType); - } + @Deprecated + public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) { + LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - " + + "use maximum-message-slice-size instead"); + datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize); + return this; + } - return datastoreContext; + public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) { + datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize); + return this; } - public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) { - datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation); + public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) { + datastoreContext.setPeerAddressResolver(resolver); return this; } - public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) { - datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize); + public Builder tempFileDirectory(final String tempFileDirectory) { + datastoreContext.setTempFileDirectory(tempFileDirectory); return this; } - public Builder shardPeerAddressResolver(PeerAddressResolver resolver) { - datastoreContext.setPeerAddressResolver(resolver); + public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) { + datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE); + return this; + } + + public Builder syncIndexThreshold(final long syncIndexThreshold) { + datastoreContext.setSyncIndexThreshold(syncIndexThreshold); + return this; + } + + public Builder backendAlivenessTimerIntervalInSeconds(final long interval) { + datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval); + return this; + } + + public Builder frontendRequestTimeoutInSeconds(final long timeout) { + datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout); + return this; + } + + public Builder frontendNoProgressTimeoutInSeconds(final long timeout) { + datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout); return this; } + + @Override + public DatastoreContext build() { + datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create( + maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize, + maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize); + + if (datastoreContext.dataStoreName != null) { + GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName); + } + + return datastoreContext; + } } }