2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.util.Timeout;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.TimeUnit;
16 import org.apache.commons.text.WordUtils;
17 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
18 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
19 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
20 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
21 import org.opendaylight.controller.cluster.raft.ConfigParams;
22 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
23 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
24 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
25 import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStoreConfigProperties;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.duration.FiniteDuration;
32 * Contains contextual data for a data store.
34 * @author Thomas Pantelis
36 // Non-final for mocking
37 public class DatastoreContext implements ClientActorConfig {
38 public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
40 public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
42 public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
43 public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
44 public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
45 public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
46 public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
47 public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
48 DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
49 public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
50 public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
51 public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
52 public static final boolean DEFAULT_PERSISTENT = true;
53 public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
54 public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
55 public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
56 public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
57 public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
58 public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
59 public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
60 public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
61 TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
62 public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
63 public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
65 public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
67 private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
69 private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
71 private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
73 private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
74 private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
75 private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
76 private String dataStoreMXBeanType;
77 private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
78 private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
79 private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
80 private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
81 private boolean persistent = DEFAULT_PERSISTENT;
82 private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
83 private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
84 private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
85 private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
86 private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
87 private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
88 private boolean writeOnlyTransactionOptimizationsEnabled = true;
89 private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
90 private boolean useTellBasedProtocol = false;
91 private boolean transactionDebugContextEnabled = false;
92 private String shardManagerPersistenceId;
93 private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
94 private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
95 private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
96 private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
97 private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
99 public static Set<String> getGlobalDatastoreNames() {
100 return GLOBAL_DATASTORE_NAMES;
104 setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
105 setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
106 setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
107 setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
108 setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
109 setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
110 setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
111 setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
112 setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
115 private DatastoreContext(final DatastoreContext other) {
116 this.dataStoreProperties = other.dataStoreProperties;
117 this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
118 this.operationTimeoutInMillis = other.operationTimeoutInMillis;
119 this.dataStoreMXBeanType = other.dataStoreMXBeanType;
120 this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
121 this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
122 this.shardInitializationTimeout = other.shardInitializationTimeout;
123 this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
124 this.persistent = other.persistent;
125 this.configurationReader = other.configurationReader;
126 this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
127 this.dataStoreName = other.dataStoreName;
128 this.logicalStoreType = other.logicalStoreType;
129 this.storeRoot = other.storeRoot;
130 this.shardBatchedModificationCount = other.shardBatchedModificationCount;
131 this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
132 this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
133 this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
134 this.shardManagerPersistenceId = other.shardManagerPersistenceId;
135 this.useTellBasedProtocol = other.useTellBasedProtocol;
136 this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
137 this.requestTimeout = other.requestTimeout;
138 this.noProgressTimeout = other.noProgressTimeout;
139 this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
141 setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
142 setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
143 setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
144 setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
145 setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
146 setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
147 setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
148 setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
149 setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
150 setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
151 setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
152 setTempFileDirectory(other.getTempFileDirectory());
153 setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
154 setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
157 public static Builder newBuilder() {
158 return new Builder(new DatastoreContext());
161 public static Builder newBuilderFrom(final DatastoreContext context) {
162 return new Builder(new DatastoreContext(context));
165 public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
166 return dataStoreProperties;
169 public FiniteDuration getShardTransactionIdleTimeout() {
170 return shardTransactionIdleTimeout;
173 public String getDataStoreMXBeanType() {
174 return dataStoreMXBeanType;
177 public long getOperationTimeoutInMillis() {
178 return operationTimeoutInMillis;
181 public ConfigParams getShardRaftConfig() {
185 public int getShardTransactionCommitTimeoutInSeconds() {
186 return shardTransactionCommitTimeoutInSeconds;
189 public int getShardTransactionCommitQueueCapacity() {
190 return shardTransactionCommitQueueCapacity;
193 public Timeout getShardInitializationTimeout() {
194 return shardInitializationTimeout;
197 public Timeout getShardLeaderElectionTimeout() {
198 return shardLeaderElectionTimeout;
201 public boolean isPersistent() {
205 public AkkaConfigurationReader getConfigurationReader() {
206 return configurationReader;
209 public long getShardElectionTimeoutFactor() {
210 return raftConfig.getElectionTimeoutFactor();
213 public String getDataStoreName() {
214 return dataStoreName;
217 public LogicalDatastoreType getLogicalStoreType() {
218 return logicalStoreType;
221 public YangInstanceIdentifier getStoreRoot() {
225 public long getTransactionCreationInitialRateLimit() {
226 return transactionCreationInitialRateLimit;
229 public String getShardManagerPersistenceId() {
230 return shardManagerPersistenceId;
234 public String getTempFileDirectory() {
235 return raftConfig.getTempFileDirectory();
238 private void setTempFileDirectory(final String tempFileDirectory) {
239 raftConfig.setTempFileDirectory(tempFileDirectory);
243 public int getFileBackedStreamingThreshold() {
244 return raftConfig.getFileBackedStreamingThreshold();
247 private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
248 raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
251 private void setPeerAddressResolver(final PeerAddressResolver resolver) {
252 raftConfig.setPeerAddressResolver(resolver);
255 private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
256 raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
257 TimeUnit.MILLISECONDS));
260 private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
261 raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
265 private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
266 raftConfig.setIsolatedLeaderCheckInterval(
267 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
270 private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
271 raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
274 private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
275 raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
278 private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
279 raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
282 private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
283 Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
284 && shardSnapshotDataThresholdPercentage <= 100);
285 raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
288 private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
289 raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
293 private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
294 // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
295 // maximumMessageSliceSize.
296 if (shardSnapshotChunkSize < maximumMessageSliceSize) {
297 raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
301 private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
302 raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
303 this.maximumMessageSliceSize = maximumMessageSliceSize;
306 private void setSyncIndexThreshold(final long syncIndexThreshold) {
307 raftConfig.setSyncIndexThreshold(syncIndexThreshold);
310 public int getShardBatchedModificationCount() {
311 return shardBatchedModificationCount;
314 public boolean isWriteOnlyTransactionOptimizationsEnabled() {
315 return writeOnlyTransactionOptimizationsEnabled;
318 public long getShardCommitQueueExpiryTimeoutInMillis() {
319 return shardCommitQueueExpiryTimeoutInMillis;
322 public boolean isTransactionDebugContextEnabled() {
323 return transactionDebugContextEnabled;
326 public boolean isUseTellBasedProtocol() {
327 return useTellBasedProtocol;
331 public int getMaximumMessageSliceSize() {
332 return maximumMessageSliceSize;
336 public long getBackendAlivenessTimerInterval() {
337 return backendAlivenessTimerInterval;
341 public long getRequestTimeout() {
342 return requestTimeout;
346 public long getNoProgressTimeout() {
347 return noProgressTimeout;
350 public int getInitialPayloadSerializedBufferCapacity() {
351 return initialPayloadSerializedBufferCapacity;
354 public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
355 private final DatastoreContext datastoreContext;
356 private int maxShardDataChangeExecutorPoolSize =
357 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
358 private int maxShardDataChangeExecutorQueueSize =
359 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
360 private int maxShardDataChangeListenerQueueSize =
361 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
362 private int maxShardDataStoreExecutorQueueSize =
363 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
365 Builder(final DatastoreContext datastoreContext) {
366 this.datastoreContext = datastoreContext;
368 if (datastoreContext.getDataStoreProperties() != null) {
369 maxShardDataChangeExecutorPoolSize =
370 datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
371 maxShardDataChangeExecutorQueueSize =
372 datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
373 maxShardDataChangeListenerQueueSize =
374 datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
375 maxShardDataStoreExecutorQueueSize =
376 datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
380 public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
381 // TODO - this is defined in the yang DataStoreProperties but not currently used.
385 public Builder enableMetricCapture(final boolean enableMetricCapture) {
386 // TODO - this is defined in the yang DataStoreProperties but not currently used.
391 public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
392 datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
396 public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
397 return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
400 public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
401 datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
405 public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
406 datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
410 public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
411 datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
415 public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
416 datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
420 public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
421 datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
425 public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
426 datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
430 public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
431 datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
435 public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
436 datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
440 public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
441 datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
445 public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
446 datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
450 public Builder shardInitializationTimeoutInSeconds(final long timeout) {
451 return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
454 public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
455 datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
459 public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
460 return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
463 public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
464 datastoreContext.configurationReader = configurationReader;
468 public Builder persistent(final boolean persistent) {
469 datastoreContext.persistent = persistent;
473 public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
474 datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
478 public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
479 datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
483 public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
484 datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
488 public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
489 datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
493 public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
494 datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
496 // Retain compatible naming
497 switch (logicalStoreType) {
499 dataStoreName("config");
502 dataStoreName("operational");
505 dataStoreName(logicalStoreType.name());
511 public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
512 datastoreContext.storeRoot = storeRoot;
516 public Builder dataStoreName(final String dataStoreName) {
517 datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
518 datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
522 public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
523 datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
527 public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
528 datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
532 public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
533 datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
537 public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
538 datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
539 value, TimeUnit.SECONDS);
543 public Builder transactionDebugContextEnabled(final boolean value) {
544 datastoreContext.transactionDebugContextEnabled = value;
548 public Builder maxShardDataChangeExecutorPoolSize(final int newMaxShardDataChangeExecutorPoolSize) {
549 this.maxShardDataChangeExecutorPoolSize = newMaxShardDataChangeExecutorPoolSize;
553 public Builder maxShardDataChangeExecutorQueueSize(final int newMaxShardDataChangeExecutorQueueSize) {
554 this.maxShardDataChangeExecutorQueueSize = newMaxShardDataChangeExecutorQueueSize;
558 public Builder maxShardDataChangeListenerQueueSize(final int newMaxShardDataChangeListenerQueueSize) {
559 this.maxShardDataChangeListenerQueueSize = newMaxShardDataChangeListenerQueueSize;
563 public Builder maxShardDataStoreExecutorQueueSize(final int newMaxShardDataStoreExecutorQueueSize) {
564 this.maxShardDataStoreExecutorQueueSize = newMaxShardDataStoreExecutorQueueSize;
568 public Builder useTellBasedProtocol(final boolean value) {
569 datastoreContext.useTellBasedProtocol = value;
574 * For unit tests only.
577 public Builder shardManagerPersistenceId(final String id) {
578 datastoreContext.shardManagerPersistenceId = id;
582 public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
583 datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
588 public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
589 LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
590 + "use maximum-message-slice-size instead");
591 datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
595 public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
596 datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
600 public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
601 datastoreContext.setPeerAddressResolver(resolver);
605 public Builder tempFileDirectory(final String tempFileDirectory) {
606 datastoreContext.setTempFileDirectory(tempFileDirectory);
610 public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
611 datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
615 public Builder syncIndexThreshold(final long syncIndexThreshold) {
616 datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
620 public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
621 datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
625 public Builder frontendRequestTimeoutInSeconds(final long timeout) {
626 datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
630 public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
631 datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
635 public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
636 datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
641 public DatastoreContext build() {
642 datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.builder()
643 .maxDataChangeExecutorPoolSize(maxShardDataChangeExecutorPoolSize)
644 .maxDataChangeExecutorQueueSize(maxShardDataChangeExecutorQueueSize)
645 .maxDataChangeListenerQueueSize(maxShardDataChangeListenerQueueSize)
646 .maxDataStoreExecutorQueueSize(maxShardDataStoreExecutorQueueSize)
649 if (datastoreContext.dataStoreName != null) {
650 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
653 return datastoreContext;