X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDatastoreContext.java;h=17e0a5c094f181e6115083a0085e8c33cf81301c;hp=dee8142bbc0904e6666784143d8f1cb795f27baa;hb=b5d01b5d28d52413bf45b626b746c1519685058c;hpb=94362614be25e34e8427c02799daffb8cae29d94 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index dee8142bbc..17e0a5c094 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -5,16 +5,17 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.text.WordUtils; +import org.apache.commons.text.WordUtils; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ClientActorConfig; import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; @@ -22,12 +23,11 @@ import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationRea import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.PeerAddressResolver; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -35,30 +35,40 @@ import scala.concurrent.duration.FiniteDuration; * * @author Thomas Pantelis */ +// Non-final for mocking public class DatastoreContext implements ClientActorConfig { public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore"; - public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES); + public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10, + TimeUnit.MINUTES); public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000; public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30; public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1; public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000; + public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0; public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500; public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10; public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000; public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES); public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS); + public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3; public static final boolean DEFAULT_PERSISTENT = true; + public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false; public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader(); public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12; + public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD = 0; public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2; + public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1; public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100; public static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000; public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); - public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB + public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 480 * 1024; // 480KiB + public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512; + public static final ExportOnRecovery DEFAULT_EXPORT_ON_RECOVERY = ExportOnRecovery.Off; + public static final String DEFAULT_RECOVERY_EXPORT_BASE_DIR = "persistence-export"; public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10; @@ -68,77 +78,93 @@ public class DatastoreContext implements ClientActorConfig { private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); - private InMemoryDOMDataStoreConfigProperties dataStoreProperties; - private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; + private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS; private String dataStoreMXBeanType; private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS; private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY; private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT; private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT; + private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER; private boolean persistent = DEFAULT_PERSISTENT; + private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE; private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; private String dataStoreName = UNKNOWN_DATA_STORE_TYPE; private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL; - private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY; + private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty(); private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; private boolean writeOnlyTransactionOptimizationsEnabled = true; private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS; - private boolean useTellBasedProtocol = false; + private boolean useTellBasedProtocol = true; private boolean transactionDebugContextEnabled = false; private String shardManagerPersistenceId; private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE; private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS; private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS; private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS; + private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY; + private boolean useLz4Compression = false; + private ExportOnRecovery exportOnRecovery = DEFAULT_EXPORT_ON_RECOVERY; + private String recoveryExportBaseDir = DEFAULT_RECOVERY_EXPORT_BASE_DIR; public static Set getGlobalDatastoreNames() { return GLOBAL_DATASTORE_NAMES; } - private DatastoreContext() { + DatastoreContext() { setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE); setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT); + setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS); setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS); setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS); setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE); + setSnapshotDataThreshold(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD); setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR); + setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR); setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD); setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE); } private DatastoreContext(final DatastoreContext other) { - this.dataStoreProperties = other.dataStoreProperties; - this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout; - this.operationTimeoutInMillis = other.operationTimeoutInMillis; - this.dataStoreMXBeanType = other.dataStoreMXBeanType; - this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds; - this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity; - this.shardInitializationTimeout = other.shardInitializationTimeout; - this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout; - this.persistent = other.persistent; - this.configurationReader = other.configurationReader; - this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; - this.dataStoreName = other.dataStoreName; - this.logicalStoreType = other.logicalStoreType; - this.storeRoot = other.storeRoot; - this.shardBatchedModificationCount = other.shardBatchedModificationCount; - this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; - this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis; - this.transactionDebugContextEnabled = other.transactionDebugContextEnabled; - this.shardManagerPersistenceId = other.shardManagerPersistenceId; - this.useTellBasedProtocol = other.useTellBasedProtocol; - this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval; - this.requestTimeout = other.requestTimeout; - this.noProgressTimeout = other.noProgressTimeout; + shardTransactionIdleTimeout = other.shardTransactionIdleTimeout; + operationTimeoutInMillis = other.operationTimeoutInMillis; + dataStoreMXBeanType = other.dataStoreMXBeanType; + shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds; + shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity; + shardInitializationTimeout = other.shardInitializationTimeout; + shardLeaderElectionTimeout = other.shardLeaderElectionTimeout; + initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier; + persistent = other.persistent; + snapshotOnRootOverwrite = other.snapshotOnRootOverwrite; + configurationReader = other.configurationReader; + transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; + dataStoreName = other.dataStoreName; + logicalStoreType = other.logicalStoreType; + storeRoot = other.storeRoot; + shardBatchedModificationCount = other.shardBatchedModificationCount; + writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; + shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis; + transactionDebugContextEnabled = other.transactionDebugContextEnabled; + shardManagerPersistenceId = other.shardManagerPersistenceId; + useTellBasedProtocol = other.useTellBasedProtocol; + backendAlivenessTimerInterval = other.backendAlivenessTimerInterval; + requestTimeout = other.requestTimeout; + noProgressTimeout = other.noProgressTimeout; + initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity; + useLz4Compression = other.useLz4Compression; + exportOnRecovery = other.exportOnRecovery; + recoveryExportBaseDir = other.recoveryExportBaseDir; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); + setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds()); setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis()); setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis()); setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage()); + setSnapshotDataThreshold(other.raftConfig.getSnapshotDataThreshold()); setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor()); + setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor()); setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass()); setMaximumMessageSliceSize(other.getMaximumMessageSliceSize()); setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); @@ -156,11 +182,7 @@ public class DatastoreContext implements ClientActorConfig { return new Builder(new DatastoreContext(context)); } - public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() { - return dataStoreProperties; - } - - public Duration getShardTransactionIdleTimeout() { + public FiniteDuration getShardTransactionIdleTimeout() { return shardTransactionIdleTimeout; } @@ -192,10 +214,24 @@ public class DatastoreContext implements ClientActorConfig { return shardLeaderElectionTimeout; } + /** + * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards + * on the local node to settle. + * + * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely. + */ + public int getInitialSettleTimeoutMultiplier() { + return initialSettleTimeoutMultiplier; + } + public boolean isPersistent() { return persistent; } + public boolean isSnapshotOnRootOverwrite() { + return snapshotOnRootOverwrite; + } + public AkkaConfigurationReader getConfigurationReader() { return configurationReader; } @@ -265,20 +301,36 @@ public class DatastoreContext implements ClientActorConfig { raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor); } + private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) { + raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor); + } + private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) { raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation); } private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) { - Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0 - && shardSnapshotDataThresholdPercentage <= 100); + checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100); raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); } + private void setSnapshotDataThreshold(final int shardSnapshotDataThreshold) { + checkArgument(shardSnapshotDataThreshold >= 0); + raftConfig.setSnapshotDataThreshold(shardSnapshotDataThreshold); + } + private void setSnapshotBatchCount(final long shardSnapshotBatchCount) { raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); } + /** + * Set the interval in seconds after which a snapshot should be taken during the recovery process. + * 0 means don't take snapshots + */ + private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) { + raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval); + } + @Deprecated private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) { // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed @@ -317,6 +369,18 @@ public class DatastoreContext implements ClientActorConfig { return useTellBasedProtocol; } + public boolean isUseLz4Compression() { + return useLz4Compression; + } + + public ExportOnRecovery getExportOnRecovery() { + return exportOnRecovery; + } + + public String getRecoveryExportBaseDir() { + return recoveryExportBaseDir; + } + @Override public int getMaximumMessageSliceSize() { return maximumMessageSliceSize; @@ -337,30 +401,15 @@ public class DatastoreContext implements ClientActorConfig { return noProgressTimeout; } - public static class Builder implements org.opendaylight.yangtools.concepts.Builder { + public int getInitialPayloadSerializedBufferCapacity() { + return initialPayloadSerializedBufferCapacity; + } + + public static class Builder { private final DatastoreContext datastoreContext; - private int maxShardDataChangeExecutorPoolSize = - InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE; - private int maxShardDataChangeExecutorQueueSize = - InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE; - private int maxShardDataChangeListenerQueueSize = - InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE; - private int maxShardDataStoreExecutorQueueSize = - InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE; - - private Builder(final DatastoreContext datastoreContext) { - this.datastoreContext = datastoreContext; - if (datastoreContext.getDataStoreProperties() != null) { - maxShardDataChangeExecutorPoolSize = - datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize(); - maxShardDataChangeExecutorQueueSize = - datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize(); - maxShardDataChangeListenerQueueSize = - datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize(); - maxShardDataStoreExecutorQueueSize = - datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize(); - } + Builder(final DatastoreContext datastoreContext) { + this.datastoreContext = datastoreContext; } public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) { @@ -375,7 +424,7 @@ public class DatastoreContext implements ClientActorConfig { public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) { - datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit); + datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit); return this; } @@ -413,11 +462,22 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) { + checkArgument(recoverySnapshotIntervalSeconds >= 0); + datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds); + return this; + } + public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) { datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); return this; } + public Builder shardSnapshotDataThreshold(final int shardSnapshotDataThreshold) { + datastoreContext.setSnapshotDataThreshold(shardSnapshotDataThreshold); + return this; + } + public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) { datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis); return this; @@ -442,6 +502,12 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder initialSettleTimeoutMultiplier(final int multiplier) { + checkArgument(multiplier >= 0); + datastoreContext.initialSettleTimeoutMultiplier = multiplier; + return this; + } + public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) { return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS); } @@ -456,6 +522,11 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) { + datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite; + return this; + } + public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) { datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis); return this; @@ -466,13 +537,18 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) { + datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor); + return this; + } + public Builder transactionCreationInitialRateLimit(final long initialRateLimit) { datastoreContext.transactionCreationInitialRateLimit = initialRateLimit; return this; } public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) { - datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType); + datastoreContext.logicalStoreType = requireNonNull(logicalStoreType); // Retain compatible naming switch (logicalStoreType) { @@ -495,7 +571,7 @@ public class DatastoreContext implements ClientActorConfig { } public Builder dataStoreName(final String dataStoreName) { - datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName); + datastoreContext.dataStoreName = requireNonNull(dataStoreName); datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore"; return this; } @@ -526,28 +602,23 @@ public class DatastoreContext implements ClientActorConfig { return this; } - public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) { - this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize; - return this; - } - - public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) { - this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize; + public Builder useTellBasedProtocol(final boolean value) { + datastoreContext.useTellBasedProtocol = value; return this; } - public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) { - this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize; + public Builder useLz4Compression(final boolean value) { + datastoreContext.useLz4Compression = value; return this; } - public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) { - this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize; + public Builder exportOnRecovery(final ExportOnRecovery value) { + datastoreContext.exportOnRecovery = value; return this; } - public Builder useTellBasedProtocol(final boolean value) { - datastoreContext.useTellBasedProtocol = value; + public Builder recoveryExportBaseDir(final String value) { + datastoreContext.recoveryExportBaseDir = value; return this; } @@ -613,12 +684,12 @@ public class DatastoreContext implements ClientActorConfig { return this; } - @Override - public DatastoreContext build() { - datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create( - maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize, - maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize); + public Builder initialPayloadSerializedBufferCapacity(final int capacity) { + datastoreContext.initialPayloadSerializedBufferCapacity = capacity; + return this; + } + public DatastoreContext build() { if (datastoreContext.dataStoreName != null) { GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName); }