* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.datastore;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.text.WordUtils;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
*
* @author Thomas Pantelis
*/
-public class DatastoreContext {
+// Non-final for mocking
+public class DatastoreContext implements ClientActorConfig {
public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
- public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
+ public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
+ TimeUnit.MINUTES);
public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
+ public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
+ public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
public static final boolean DEFAULT_PERSISTENT = true;
+ public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
+ public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD = 0;
public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
+ public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
+ public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
+ public static final ExportOnRecovery DEFAULT_EXPORT_ON_RECOVERY = ExportOnRecovery.Off;
+ public static final String DEFAULT_RECOVERY_EXPORT_BASE_DIR = "persistence-export";
public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
- private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
- private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+ private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
private String dataStoreMXBeanType;
private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+ private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
private boolean persistent = DEFAULT_PERSISTENT;
+ private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE;
private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
- private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
+ private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty();
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private boolean writeOnlyTransactionOptimizationsEnabled = true;
private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
private boolean transactionDebugContextEnabled = false;
private String shardManagerPersistenceId;
private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
+ private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
+ private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
+ private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
+ private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
+ private boolean useLz4Compression = false;
+ private ExportOnRecovery exportOnRecovery = DEFAULT_EXPORT_ON_RECOVERY;
+ private String recoveryExportBaseDir = DEFAULT_RECOVERY_EXPORT_BASE_DIR;
public static Set<String> getGlobalDatastoreNames() {
return GLOBAL_DATASTORE_NAMES;
}
- private DatastoreContext() {
+ DatastoreContext() {
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
+ setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
+ setSnapshotDataThreshold(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD);
setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
+ setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
}
private DatastoreContext(final DatastoreContext other) {
- this.dataStoreProperties = other.dataStoreProperties;
this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
this.operationTimeoutInMillis = other.operationTimeoutInMillis;
this.dataStoreMXBeanType = other.dataStoreMXBeanType;
this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
this.shardInitializationTimeout = other.shardInitializationTimeout;
this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
+ this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
this.persistent = other.persistent;
+ this.snapshotOnRootOverwrite = other.snapshotOnRootOverwrite;
this.configurationReader = other.configurationReader;
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
this.dataStoreName = other.dataStoreName;
this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
this.shardManagerPersistenceId = other.shardManagerPersistenceId;
this.useTellBasedProtocol = other.useTellBasedProtocol;
+ this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
+ this.requestTimeout = other.requestTimeout;
+ this.noProgressTimeout = other.noProgressTimeout;
+ this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
+ this.useLz4Compression = other.useLz4Compression;
+ this.exportOnRecovery = other.exportOnRecovery;
+ this.recoveryExportBaseDir = other.recoveryExportBaseDir;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
+ setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
+ setSnapshotDataThreshold(other.raftConfig.getSnapshotDataThreshold());
setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
+ setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
return new Builder(new DatastoreContext(context));
}
- public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
- return dataStoreProperties;
- }
-
- public Duration getShardTransactionIdleTimeout() {
+ public FiniteDuration getShardTransactionIdleTimeout() {
return shardTransactionIdleTimeout;
}
return shardLeaderElectionTimeout;
}
+ /**
+ * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
+ * on the local node to settle.
+ *
+ * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
+ */
+ public int getInitialSettleTimeoutMultiplier() {
+ return initialSettleTimeoutMultiplier;
+ }
+
public boolean isPersistent() {
return persistent;
}
+ public boolean isSnapshotOnRootOverwrite() {
+ return this.snapshotOnRootOverwrite;
+ }
+
public AkkaConfigurationReader getConfigurationReader() {
return configurationReader;
}
return shardManagerPersistenceId;
}
+ @Override
public String getTempFileDirectory() {
return raftConfig.getTempFileDirectory();
}
raftConfig.setTempFileDirectory(tempFileDirectory);
}
+ @Override
public int getFileBackedStreamingThreshold() {
return raftConfig.getFileBackedStreamingThreshold();
}
raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
}
+ private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
+ raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
+ }
+
private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
}
private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
- Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
- && shardSnapshotDataThresholdPercentage <= 100);
+ checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
}
+ private void setSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
+ checkArgument(shardSnapshotDataThreshold >= 0);
+ raftConfig.setSnapshotDataThreshold(shardSnapshotDataThreshold);
+ }
+
private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
+ /**
+ * Set the interval in seconds after which a snapshot should be taken during the recovery process.
+ * 0 means don't take snapshots
+ */
+ private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
+ raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
+ }
+
@Deprecated
private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
// We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
return useTellBasedProtocol;
}
+ public boolean isUseLz4Compression() {
+ return useLz4Compression;
+ }
+
+ public ExportOnRecovery getExportOnRecovery() {
+ return exportOnRecovery;
+ }
+
+ public String getRecoveryExportBaseDir() {
+ return recoveryExportBaseDir;
+ }
+
+ @Override
public int getMaximumMessageSliceSize() {
return maximumMessageSliceSize;
}
+ @Override
+ public long getBackendAlivenessTimerInterval() {
+ return backendAlivenessTimerInterval;
+ }
+
+ @Override
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ @Override
+ public long getNoProgressTimeout() {
+ return noProgressTimeout;
+ }
+
+ public int getInitialPayloadSerializedBufferCapacity() {
+ return initialPayloadSerializedBufferCapacity;
+ }
+
public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
private final DatastoreContext datastoreContext;
- private int maxShardDataChangeExecutorPoolSize =
- InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
- private int maxShardDataChangeExecutorQueueSize =
- InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
- private int maxShardDataChangeListenerQueueSize =
- InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
- private int maxShardDataStoreExecutorQueueSize =
- InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
-
- private Builder(final DatastoreContext datastoreContext) {
- this.datastoreContext = datastoreContext;
- if (datastoreContext.getDataStoreProperties() != null) {
- maxShardDataChangeExecutorPoolSize =
- datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
- maxShardDataChangeExecutorQueueSize =
- datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
- maxShardDataChangeListenerQueueSize =
- datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
- maxShardDataStoreExecutorQueueSize =
- datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
- }
+ Builder(final DatastoreContext datastoreContext) {
+ this.datastoreContext = datastoreContext;
}
public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
- datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
+ datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
return this;
}
return this;
}
+ public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
+ checkArgument(recoverySnapshotIntervalSeconds >= 0);
+ datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
+ return this;
+ }
+
public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
return this;
}
+ public Builder shardSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
+ datastoreContext.setSnapshotDataThreshold(shardSnapshotDataThreshold);
+ return this;
+ }
+
public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
return this;
return this;
}
+ public Builder initialSettleTimeoutMultiplier(final int multiplier) {
+ checkArgument(multiplier >= 0);
+ datastoreContext.initialSettleTimeoutMultiplier = multiplier;
+ return this;
+ }
+
public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
}
return this;
}
+ public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) {
+ datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite;
+ return this;
+ }
+
public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
return this;
return this;
}
+ public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
+ datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
+ return this;
+ }
+
public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
return this;
}
public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
- datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
+ datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
// Retain compatible naming
switch (logicalStoreType) {
}
public Builder dataStoreName(final String dataStoreName) {
- datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
+ datastoreContext.dataStoreName = requireNonNull(dataStoreName);
datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
return this;
}
return this;
}
- public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) {
- this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
- return this;
- }
-
- public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) {
- this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
+ public Builder useTellBasedProtocol(final boolean value) {
+ datastoreContext.useTellBasedProtocol = value;
return this;
}
- public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) {
- this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
+ public Builder useLz4Compression(final boolean value) {
+ datastoreContext.useLz4Compression = value;
return this;
}
- public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) {
- this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
+ public Builder exportOnRecovery(final ExportOnRecovery value) {
+ datastoreContext.exportOnRecovery = value;
return this;
}
- public Builder useTellBasedProtocol(final boolean value) {
- datastoreContext.useTellBasedProtocol = value;
+ public Builder recoveryExportBaseDir(final String value) {
+ datastoreContext.recoveryExportBaseDir = value;
return this;
}
return this;
}
- public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
+ public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
return this;
}
return this;
}
+ public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
+ datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
+ return this;
+ }
+
+ public Builder frontendRequestTimeoutInSeconds(final long timeout) {
+ datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
+ return this;
+ }
+
+ public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
+ datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
+ return this;
+ }
+
+ public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
+ datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
+ return this;
+ }
+
@Override
public DatastoreContext build() {
- datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
- maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
- maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
-
if (datastoreContext.dataStoreName != null) {
GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
}