2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.TimeUnit;
18 import org.apache.commons.text.WordUtils;
19 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
21 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
22 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
23 import org.opendaylight.controller.cluster.raft.ConfigParams;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStoreConfigProperties;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.FiniteDuration;
34 * Contains contextual data for a data store.
36 * @author Thomas Pantelis
38 // Non-final for mocking
39 public class DatastoreContext implements ClientActorConfig {
40 public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
42 public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
44 public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
45 public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
46 public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
47 public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
48 public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
49 public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
50 DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
51 public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
52 public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
53 public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
54 public static final boolean DEFAULT_PERSISTENT = true;
55 public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
56 public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
57 public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
58 public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
59 public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
60 public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
61 public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
62 public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
63 TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
64 public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
65 public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
67 public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
69 private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
71 private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
73 private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
75 private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
76 private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
77 private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
78 private String dataStoreMXBeanType;
79 private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
80 private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
81 private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
82 private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
83 private boolean persistent = DEFAULT_PERSISTENT;
84 private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
85 private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
86 private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
87 private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
88 private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty();
89 private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
90 private boolean writeOnlyTransactionOptimizationsEnabled = true;
91 private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
92 private boolean useTellBasedProtocol = false;
93 private boolean transactionDebugContextEnabled = false;
94 private String shardManagerPersistenceId;
95 private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
96 private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
97 private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
98 private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
99 private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
101 public static Set<String> getGlobalDatastoreNames() {
102 return GLOBAL_DATASTORE_NAMES;
106 setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
107 setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
108 setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
109 setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
110 setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
111 setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
112 setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
113 setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
114 setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
117 private DatastoreContext(final DatastoreContext other) {
118 this.dataStoreProperties = other.dataStoreProperties;
119 this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
120 this.operationTimeoutInMillis = other.operationTimeoutInMillis;
121 this.dataStoreMXBeanType = other.dataStoreMXBeanType;
122 this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
123 this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
124 this.shardInitializationTimeout = other.shardInitializationTimeout;
125 this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
126 this.persistent = other.persistent;
127 this.configurationReader = other.configurationReader;
128 this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
129 this.dataStoreName = other.dataStoreName;
130 this.logicalStoreType = other.logicalStoreType;
131 this.storeRoot = other.storeRoot;
132 this.shardBatchedModificationCount = other.shardBatchedModificationCount;
133 this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
134 this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
135 this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
136 this.shardManagerPersistenceId = other.shardManagerPersistenceId;
137 this.useTellBasedProtocol = other.useTellBasedProtocol;
138 this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
139 this.requestTimeout = other.requestTimeout;
140 this.noProgressTimeout = other.noProgressTimeout;
141 this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
143 setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
144 setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
145 setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
146 setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
147 setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
148 setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
149 setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
150 setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
151 setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
152 setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
153 setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
154 setTempFileDirectory(other.getTempFileDirectory());
155 setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
156 setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
159 public static Builder newBuilder() {
160 return new Builder(new DatastoreContext());
163 public static Builder newBuilderFrom(final DatastoreContext context) {
164 return new Builder(new DatastoreContext(context));
167 public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
168 return dataStoreProperties;
171 public FiniteDuration getShardTransactionIdleTimeout() {
172 return shardTransactionIdleTimeout;
175 public String getDataStoreMXBeanType() {
176 return dataStoreMXBeanType;
179 public long getOperationTimeoutInMillis() {
180 return operationTimeoutInMillis;
183 public ConfigParams getShardRaftConfig() {
187 public int getShardTransactionCommitTimeoutInSeconds() {
188 return shardTransactionCommitTimeoutInSeconds;
191 public int getShardTransactionCommitQueueCapacity() {
192 return shardTransactionCommitQueueCapacity;
195 public Timeout getShardInitializationTimeout() {
196 return shardInitializationTimeout;
199 public Timeout getShardLeaderElectionTimeout() {
200 return shardLeaderElectionTimeout;
203 public boolean isPersistent() {
207 public AkkaConfigurationReader getConfigurationReader() {
208 return configurationReader;
211 public long getShardElectionTimeoutFactor() {
212 return raftConfig.getElectionTimeoutFactor();
215 public String getDataStoreName() {
216 return dataStoreName;
219 public LogicalDatastoreType getLogicalStoreType() {
220 return logicalStoreType;
223 public YangInstanceIdentifier getStoreRoot() {
227 public long getTransactionCreationInitialRateLimit() {
228 return transactionCreationInitialRateLimit;
231 public String getShardManagerPersistenceId() {
232 return shardManagerPersistenceId;
236 public String getTempFileDirectory() {
237 return raftConfig.getTempFileDirectory();
240 private void setTempFileDirectory(final String tempFileDirectory) {
241 raftConfig.setTempFileDirectory(tempFileDirectory);
245 public int getFileBackedStreamingThreshold() {
246 return raftConfig.getFileBackedStreamingThreshold();
249 private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
250 raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
253 private void setPeerAddressResolver(final PeerAddressResolver resolver) {
254 raftConfig.setPeerAddressResolver(resolver);
257 private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
258 raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
259 TimeUnit.MILLISECONDS));
262 private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
263 raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
267 private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
268 raftConfig.setIsolatedLeaderCheckInterval(
269 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
272 private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
273 raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
276 private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
277 raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
280 private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
281 raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
284 private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
285 checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
286 raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
289 private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
290 raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
294 private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
295 // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
296 // maximumMessageSliceSize.
297 if (shardSnapshotChunkSize < maximumMessageSliceSize) {
298 raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
302 private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
303 raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
304 this.maximumMessageSliceSize = maximumMessageSliceSize;
307 private void setSyncIndexThreshold(final long syncIndexThreshold) {
308 raftConfig.setSyncIndexThreshold(syncIndexThreshold);
311 public int getShardBatchedModificationCount() {
312 return shardBatchedModificationCount;
315 public boolean isWriteOnlyTransactionOptimizationsEnabled() {
316 return writeOnlyTransactionOptimizationsEnabled;
319 public long getShardCommitQueueExpiryTimeoutInMillis() {
320 return shardCommitQueueExpiryTimeoutInMillis;
323 public boolean isTransactionDebugContextEnabled() {
324 return transactionDebugContextEnabled;
327 public boolean isUseTellBasedProtocol() {
328 return useTellBasedProtocol;
332 public int getMaximumMessageSliceSize() {
333 return maximumMessageSliceSize;
337 public long getBackendAlivenessTimerInterval() {
338 return backendAlivenessTimerInterval;
342 public long getRequestTimeout() {
343 return requestTimeout;
347 public long getNoProgressTimeout() {
348 return noProgressTimeout;
351 public int getInitialPayloadSerializedBufferCapacity() {
352 return initialPayloadSerializedBufferCapacity;
355 public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
356 private final DatastoreContext datastoreContext;
357 private int maxShardDataChangeExecutorPoolSize =
358 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
359 private int maxShardDataChangeExecutorQueueSize =
360 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
361 private int maxShardDataChangeListenerQueueSize =
362 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
363 private int maxShardDataStoreExecutorQueueSize =
364 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
366 Builder(final DatastoreContext datastoreContext) {
367 this.datastoreContext = datastoreContext;
369 if (datastoreContext.getDataStoreProperties() != null) {
370 maxShardDataChangeExecutorPoolSize =
371 datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
372 maxShardDataChangeExecutorQueueSize =
373 datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
374 maxShardDataChangeListenerQueueSize =
375 datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
376 maxShardDataStoreExecutorQueueSize =
377 datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
381 public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
382 // TODO - this is defined in the yang DataStoreProperties but not currently used.
386 public Builder enableMetricCapture(final boolean enableMetricCapture) {
387 // TODO - this is defined in the yang DataStoreProperties but not currently used.
392 public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
393 datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
397 public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
398 return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
401 public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
402 datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
406 public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
407 datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
411 public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
412 datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
416 public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
417 datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
421 public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
422 datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
426 public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
427 datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
431 public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
432 datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
436 public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
437 datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
441 public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
442 datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
446 public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
447 datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
451 public Builder shardInitializationTimeoutInSeconds(final long timeout) {
452 return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
455 public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
456 datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
460 public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
461 return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
464 public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
465 datastoreContext.configurationReader = configurationReader;
469 public Builder persistent(final boolean persistent) {
470 datastoreContext.persistent = persistent;
474 public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
475 datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
479 public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
480 datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
484 public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
485 datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
489 public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
490 datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
494 public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
495 datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
497 // Retain compatible naming
498 switch (logicalStoreType) {
500 dataStoreName("config");
503 dataStoreName("operational");
506 dataStoreName(logicalStoreType.name());
512 public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
513 datastoreContext.storeRoot = storeRoot;
517 public Builder dataStoreName(final String dataStoreName) {
518 datastoreContext.dataStoreName = requireNonNull(dataStoreName);
519 datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
523 public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
524 datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
528 public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
529 datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
533 public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
534 datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
538 public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
539 datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
540 value, TimeUnit.SECONDS);
544 public Builder transactionDebugContextEnabled(final boolean value) {
545 datastoreContext.transactionDebugContextEnabled = value;
549 public Builder maxShardDataChangeExecutorPoolSize(final int newMaxShardDataChangeExecutorPoolSize) {
550 this.maxShardDataChangeExecutorPoolSize = newMaxShardDataChangeExecutorPoolSize;
554 public Builder maxShardDataChangeExecutorQueueSize(final int newMaxShardDataChangeExecutorQueueSize) {
555 this.maxShardDataChangeExecutorQueueSize = newMaxShardDataChangeExecutorQueueSize;
559 public Builder maxShardDataChangeListenerQueueSize(final int newMaxShardDataChangeListenerQueueSize) {
560 this.maxShardDataChangeListenerQueueSize = newMaxShardDataChangeListenerQueueSize;
564 public Builder maxShardDataStoreExecutorQueueSize(final int newMaxShardDataStoreExecutorQueueSize) {
565 this.maxShardDataStoreExecutorQueueSize = newMaxShardDataStoreExecutorQueueSize;
569 public Builder useTellBasedProtocol(final boolean value) {
570 datastoreContext.useTellBasedProtocol = value;
575 * For unit tests only.
578 public Builder shardManagerPersistenceId(final String id) {
579 datastoreContext.shardManagerPersistenceId = id;
583 public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
584 datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
589 public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
590 LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
591 + "use maximum-message-slice-size instead");
592 datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
596 public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
597 datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
601 public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
602 datastoreContext.setPeerAddressResolver(resolver);
606 public Builder tempFileDirectory(final String tempFileDirectory) {
607 datastoreContext.setTempFileDirectory(tempFileDirectory);
611 public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
612 datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
616 public Builder syncIndexThreshold(final long syncIndexThreshold) {
617 datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
621 public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
622 datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
626 public Builder frontendRequestTimeoutInSeconds(final long timeout) {
627 datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
631 public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
632 datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
636 public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
637 datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
642 public DatastoreContext build() {
643 datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.builder()
644 .maxDataChangeExecutorPoolSize(maxShardDataChangeExecutorPoolSize)
645 .maxDataChangeExecutorQueueSize(maxShardDataChangeExecutorQueueSize)
646 .maxDataChangeListenerQueueSize(maxShardDataChangeListenerQueueSize)
647 .maxDataStoreExecutorQueueSize(maxShardDataStoreExecutorQueueSize)
650 if (datastoreContext.dataStoreName != null) {
651 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
654 return datastoreContext;