2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.TimeUnit;
18 import org.apache.commons.text.WordUtils;
19 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
21 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
22 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
23 import org.opendaylight.controller.cluster.raft.ConfigParams;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.FiniteDuration;
34 * Contains contextual data for a data store.
36 * @author Thomas Pantelis
38 // Non-final for mocking
39 public class DatastoreContext implements ClientActorConfig {
40 public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
42 public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
44 public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
45 public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
46 public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
47 public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
48 public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
49 public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
50 public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
51 DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
52 public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
53 public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
54 public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
55 public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
56 public static final boolean DEFAULT_PERSISTENT = true;
57 public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
58 public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
59 public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
60 public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD = 0;
61 public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
62 public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
63 public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
64 public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
65 public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
66 public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
67 TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
68 public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 480 * 1024; // 480KiB
69 public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
70 public static final ExportOnRecovery DEFAULT_EXPORT_ON_RECOVERY = ExportOnRecovery.Off;
71 public static final String DEFAULT_RECOVERY_EXPORT_BASE_DIR = "persistence-export";
73 public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
75 private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
77 private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
79 private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
81 private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
82 private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
83 private String dataStoreMXBeanType;
84 private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
85 private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
86 private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
87 private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
88 private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
89 private boolean persistent = DEFAULT_PERSISTENT;
90 private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE;
91 private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
92 private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
93 private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
94 private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
95 private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.of();
96 private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
97 private boolean writeOnlyTransactionOptimizationsEnabled = true;
98 private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
99 @Deprecated(since = "7.0.0", forRemoval = true)
100 private boolean useTellBasedProtocol = true;
101 private boolean transactionDebugContextEnabled = false;
102 private String shardManagerPersistenceId;
103 private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
104 private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
105 private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
106 private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
107 private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
108 private boolean useLz4Compression = false;
109 private ExportOnRecovery exportOnRecovery = DEFAULT_EXPORT_ON_RECOVERY;
110 private String recoveryExportBaseDir = DEFAULT_RECOVERY_EXPORT_BASE_DIR;
112 public static Set<String> getGlobalDatastoreNames() {
113 return GLOBAL_DATASTORE_NAMES;
117 setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
118 setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
119 setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
120 setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
121 setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
122 setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
123 setSnapshotDataThreshold(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD);
124 setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
125 setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
126 setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
127 setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
130 private DatastoreContext(final DatastoreContext other) {
131 shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
132 operationTimeoutInMillis = other.operationTimeoutInMillis;
133 dataStoreMXBeanType = other.dataStoreMXBeanType;
134 shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
135 shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
136 shardInitializationTimeout = other.shardInitializationTimeout;
137 shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
138 initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
139 persistent = other.persistent;
140 snapshotOnRootOverwrite = other.snapshotOnRootOverwrite;
141 configurationReader = other.configurationReader;
142 transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
143 dataStoreName = other.dataStoreName;
144 logicalStoreType = other.logicalStoreType;
145 storeRoot = other.storeRoot;
146 shardBatchedModificationCount = other.shardBatchedModificationCount;
147 writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
148 shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
149 transactionDebugContextEnabled = other.transactionDebugContextEnabled;
150 shardManagerPersistenceId = other.shardManagerPersistenceId;
151 useTellBasedProtocol = other.useTellBasedProtocol;
152 backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
153 requestTimeout = other.requestTimeout;
154 noProgressTimeout = other.noProgressTimeout;
155 initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
156 useLz4Compression = other.useLz4Compression;
157 exportOnRecovery = other.exportOnRecovery;
158 recoveryExportBaseDir = other.recoveryExportBaseDir;
160 setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
161 setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
162 setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
163 setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
164 setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
165 setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
166 setSnapshotDataThreshold(other.raftConfig.getSnapshotDataThreshold());
167 setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
168 setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
169 setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
170 setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
171 setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
172 setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
173 setTempFileDirectory(other.getTempFileDirectory());
174 setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
175 setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
178 public static Builder newBuilder() {
179 return new Builder(new DatastoreContext());
182 public static Builder newBuilderFrom(final DatastoreContext context) {
183 return new Builder(new DatastoreContext(context));
186 public FiniteDuration getShardTransactionIdleTimeout() {
187 return shardTransactionIdleTimeout;
190 public String getDataStoreMXBeanType() {
191 return dataStoreMXBeanType;
194 public long getOperationTimeoutInMillis() {
195 return operationTimeoutInMillis;
198 public ConfigParams getShardRaftConfig() {
202 public int getShardTransactionCommitTimeoutInSeconds() {
203 return shardTransactionCommitTimeoutInSeconds;
206 public int getShardTransactionCommitQueueCapacity() {
207 return shardTransactionCommitQueueCapacity;
210 public Timeout getShardInitializationTimeout() {
211 return shardInitializationTimeout;
214 public Timeout getShardLeaderElectionTimeout() {
215 return shardLeaderElectionTimeout;
219 * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
220 * on the local node to settle.
222 * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
224 public int getInitialSettleTimeoutMultiplier() {
225 return initialSettleTimeoutMultiplier;
228 public boolean isPersistent() {
232 public boolean isSnapshotOnRootOverwrite() {
233 return snapshotOnRootOverwrite;
236 public AkkaConfigurationReader getConfigurationReader() {
237 return configurationReader;
240 public long getShardElectionTimeoutFactor() {
241 return raftConfig.getElectionTimeoutFactor();
244 public String getDataStoreName() {
245 return dataStoreName;
248 public LogicalDatastoreType getLogicalStoreType() {
249 return logicalStoreType;
252 public YangInstanceIdentifier getStoreRoot() {
256 public long getTransactionCreationInitialRateLimit() {
257 return transactionCreationInitialRateLimit;
260 public String getShardManagerPersistenceId() {
261 return shardManagerPersistenceId;
265 public String getTempFileDirectory() {
266 return raftConfig.getTempFileDirectory();
269 private void setTempFileDirectory(final String tempFileDirectory) {
270 raftConfig.setTempFileDirectory(tempFileDirectory);
274 public int getFileBackedStreamingThreshold() {
275 return raftConfig.getFileBackedStreamingThreshold();
278 private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
279 raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
282 private void setPeerAddressResolver(final PeerAddressResolver resolver) {
283 raftConfig.setPeerAddressResolver(resolver);
286 private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
287 raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
288 TimeUnit.MILLISECONDS));
291 private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
292 raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
296 private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
297 raftConfig.setIsolatedLeaderCheckInterval(
298 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
301 private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
302 raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
305 private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
306 raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
309 private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
310 raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
313 private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
314 checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
315 raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
318 private void setSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
319 checkArgument(shardSnapshotDataThreshold >= 0);
320 raftConfig.setSnapshotDataThreshold(shardSnapshotDataThreshold);
323 private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
324 raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
328 * Set the interval in seconds after which a snapshot should be taken during the recovery process.
329 * 0 means don't take snapshots
331 private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
332 raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
336 private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
337 // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
338 // maximumMessageSliceSize.
339 if (shardSnapshotChunkSize < maximumMessageSliceSize) {
340 raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
344 private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
345 raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
346 this.maximumMessageSliceSize = maximumMessageSliceSize;
349 private void setSyncIndexThreshold(final long syncIndexThreshold) {
350 raftConfig.setSyncIndexThreshold(syncIndexThreshold);
353 public int getShardBatchedModificationCount() {
354 return shardBatchedModificationCount;
357 public boolean isWriteOnlyTransactionOptimizationsEnabled() {
358 return writeOnlyTransactionOptimizationsEnabled;
361 public long getShardCommitQueueExpiryTimeoutInMillis() {
362 return shardCommitQueueExpiryTimeoutInMillis;
365 public boolean isTransactionDebugContextEnabled() {
366 return transactionDebugContextEnabled;
369 @Deprecated(since = "7.0.0", forRemoval = true)
370 public boolean isUseTellBasedProtocol() {
371 return useTellBasedProtocol;
374 public boolean isUseLz4Compression() {
375 return useLz4Compression;
378 public ExportOnRecovery getExportOnRecovery() {
379 return exportOnRecovery;
382 public String getRecoveryExportBaseDir() {
383 return recoveryExportBaseDir;
387 public int getMaximumMessageSliceSize() {
388 return maximumMessageSliceSize;
392 public long getBackendAlivenessTimerInterval() {
393 return backendAlivenessTimerInterval;
397 public long getRequestTimeout() {
398 return requestTimeout;
402 public long getNoProgressTimeout() {
403 return noProgressTimeout;
406 public int getInitialPayloadSerializedBufferCapacity() {
407 return initialPayloadSerializedBufferCapacity;
410 public static class Builder {
411 private final DatastoreContext datastoreContext;
413 Builder(final DatastoreContext datastoreContext) {
414 this.datastoreContext = datastoreContext;
417 public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
418 // TODO - this is defined in the yang DataStoreProperties but not currently used.
422 public Builder enableMetricCapture(final boolean enableMetricCapture) {
423 // TODO - this is defined in the yang DataStoreProperties but not currently used.
428 public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
429 datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
433 public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
434 return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
437 public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
438 datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
442 public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
443 datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
447 public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
448 datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
452 public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
453 datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
457 public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
458 datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
462 public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
463 datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
467 public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
468 checkArgument(recoverySnapshotIntervalSeconds >= 0);
469 datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
473 public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
474 datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
478 public Builder shardSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
479 datastoreContext.setSnapshotDataThreshold(shardSnapshotDataThreshold);
483 public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
484 datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
488 public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
489 datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
493 public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
494 datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
498 public Builder shardInitializationTimeoutInSeconds(final long timeout) {
499 return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
502 public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
503 datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
507 public Builder initialSettleTimeoutMultiplier(final int multiplier) {
508 checkArgument(multiplier >= 0);
509 datastoreContext.initialSettleTimeoutMultiplier = multiplier;
513 public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
514 return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
517 public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
518 datastoreContext.configurationReader = configurationReader;
522 public Builder persistent(final boolean persistent) {
523 datastoreContext.persistent = persistent;
527 public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) {
528 datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite;
532 public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
533 datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
537 public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
538 datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
542 public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
543 datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
547 public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
548 datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
552 public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
553 datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
555 // Retain compatible naming
556 switch (logicalStoreType) {
558 dataStoreName("config");
561 dataStoreName("operational");
564 dataStoreName(logicalStoreType.name());
570 public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
571 datastoreContext.storeRoot = storeRoot;
575 public Builder dataStoreName(final String dataStoreName) {
576 datastoreContext.dataStoreName = requireNonNull(dataStoreName);
577 datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
581 public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
582 datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
586 public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
587 datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
591 public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
592 datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
596 public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
597 datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
598 value, TimeUnit.SECONDS);
602 public Builder transactionDebugContextEnabled(final boolean value) {
603 datastoreContext.transactionDebugContextEnabled = value;
607 @Deprecated(since = "7.0.0", forRemoval = true)
608 public Builder useTellBasedProtocol(final boolean value) {
609 datastoreContext.useTellBasedProtocol = value;
613 public Builder useLz4Compression(final boolean value) {
614 datastoreContext.useLz4Compression = value;
618 public Builder exportOnRecovery(final ExportOnRecovery value) {
619 datastoreContext.exportOnRecovery = value;
623 public Builder recoveryExportBaseDir(final String value) {
624 datastoreContext.recoveryExportBaseDir = value;
629 * For unit tests only.
632 public Builder shardManagerPersistenceId(final String id) {
633 datastoreContext.shardManagerPersistenceId = id;
637 public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
638 datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
643 public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
644 LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
645 + "use maximum-message-slice-size instead");
646 datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
650 public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
651 datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
655 public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
656 datastoreContext.setPeerAddressResolver(resolver);
660 public Builder tempFileDirectory(final String tempFileDirectory) {
661 datastoreContext.setTempFileDirectory(tempFileDirectory);
665 public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
666 datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
670 public Builder syncIndexThreshold(final long syncIndexThreshold) {
671 datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
675 public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
676 datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
680 public Builder frontendRequestTimeoutInSeconds(final long timeout) {
681 datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
685 public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
686 datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
690 public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
691 datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
695 public DatastoreContext build() {
696 if (datastoreContext.dataStoreName != null) {
697 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
700 return datastoreContext;