Obsolete ask-based protocol
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.TimeUnit;
18 import org.apache.commons.text.WordUtils;
19 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
21 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
22 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
23 import org.opendaylight.controller.cluster.raft.ConfigParams;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.FiniteDuration;
32
33 /**
34  * Contains contextual data for a data store.
35  *
36  * @author Thomas Pantelis
37  */
38 // Non-final for mocking
39 public class DatastoreContext implements ClientActorConfig {
40     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
41
42     public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
43         TimeUnit.MINUTES);
44     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
45     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
46     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
47     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
48     public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
49     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
50     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
51             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
52     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
53     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
54     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
55     public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
56     public static final boolean DEFAULT_PERSISTENT = true;
57     public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
58     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
59     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
60     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD = 0;
61     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
62     public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
63     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
64     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
65     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
66     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
67             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
68     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 480 * 1024; // 480KiB
69     public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
70     public static final ExportOnRecovery DEFAULT_EXPORT_ON_RECOVERY = ExportOnRecovery.Off;
71     public static final String DEFAULT_RECOVERY_EXPORT_BASE_DIR = "persistence-export";
72
73     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
74
75     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
76
77     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
78
79     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
80
81     private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
82     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
83     private String dataStoreMXBeanType;
84     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
85     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
86     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
87     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
88     private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
89     private boolean persistent = DEFAULT_PERSISTENT;
90     private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE;
91     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
92     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
93     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
94     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
95     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty();
96     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
97     private boolean writeOnlyTransactionOptimizationsEnabled = true;
98     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
99     @Deprecated(since = "7.0.0", forRemoval = true)
100     private boolean useTellBasedProtocol = true;
101     private boolean transactionDebugContextEnabled = false;
102     private String shardManagerPersistenceId;
103     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
104     private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
105     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
106     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
107     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
108     private boolean useLz4Compression = false;
109     private ExportOnRecovery exportOnRecovery = DEFAULT_EXPORT_ON_RECOVERY;
110     private String recoveryExportBaseDir = DEFAULT_RECOVERY_EXPORT_BASE_DIR;
111
112     public static Set<String> getGlobalDatastoreNames() {
113         return GLOBAL_DATASTORE_NAMES;
114     }
115
116     DatastoreContext() {
117         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
118         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
119         setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
120         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
121         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
122         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
123         setSnapshotDataThreshold(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD);
124         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
125         setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
126         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
127         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
128     }
129
130     private DatastoreContext(final DatastoreContext other) {
131         shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
132         operationTimeoutInMillis = other.operationTimeoutInMillis;
133         dataStoreMXBeanType = other.dataStoreMXBeanType;
134         shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
135         shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
136         shardInitializationTimeout = other.shardInitializationTimeout;
137         shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
138         initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
139         persistent = other.persistent;
140         snapshotOnRootOverwrite = other.snapshotOnRootOverwrite;
141         configurationReader = other.configurationReader;
142         transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
143         dataStoreName = other.dataStoreName;
144         logicalStoreType = other.logicalStoreType;
145         storeRoot = other.storeRoot;
146         shardBatchedModificationCount = other.shardBatchedModificationCount;
147         writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
148         shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
149         transactionDebugContextEnabled = other.transactionDebugContextEnabled;
150         shardManagerPersistenceId = other.shardManagerPersistenceId;
151         useTellBasedProtocol = other.useTellBasedProtocol;
152         backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
153         requestTimeout = other.requestTimeout;
154         noProgressTimeout = other.noProgressTimeout;
155         initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
156         useLz4Compression = other.useLz4Compression;
157         exportOnRecovery = other.exportOnRecovery;
158         recoveryExportBaseDir = other.recoveryExportBaseDir;
159
160         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
161         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
162         setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
163         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
164         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
165         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
166         setSnapshotDataThreshold(other.raftConfig.getSnapshotDataThreshold());
167         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
168         setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
169         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
170         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
171         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
172         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
173         setTempFileDirectory(other.getTempFileDirectory());
174         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
175         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
176     }
177
178     public static Builder newBuilder() {
179         return new Builder(new DatastoreContext());
180     }
181
182     public static Builder newBuilderFrom(final DatastoreContext context) {
183         return new Builder(new DatastoreContext(context));
184     }
185
186     public FiniteDuration getShardTransactionIdleTimeout() {
187         return shardTransactionIdleTimeout;
188     }
189
190     public String getDataStoreMXBeanType() {
191         return dataStoreMXBeanType;
192     }
193
194     public long getOperationTimeoutInMillis() {
195         return operationTimeoutInMillis;
196     }
197
198     public ConfigParams getShardRaftConfig() {
199         return raftConfig;
200     }
201
202     public int getShardTransactionCommitTimeoutInSeconds() {
203         return shardTransactionCommitTimeoutInSeconds;
204     }
205
206     public int getShardTransactionCommitQueueCapacity() {
207         return shardTransactionCommitQueueCapacity;
208     }
209
210     public Timeout getShardInitializationTimeout() {
211         return shardInitializationTimeout;
212     }
213
214     public Timeout getShardLeaderElectionTimeout() {
215         return shardLeaderElectionTimeout;
216     }
217
218     /**
219      * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
220      * on the local node to settle.
221      *
222      * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
223      */
224     public int getInitialSettleTimeoutMultiplier() {
225         return initialSettleTimeoutMultiplier;
226     }
227
228     public boolean isPersistent() {
229         return persistent;
230     }
231
232     public boolean isSnapshotOnRootOverwrite() {
233         return snapshotOnRootOverwrite;
234     }
235
236     public AkkaConfigurationReader getConfigurationReader() {
237         return configurationReader;
238     }
239
240     public long getShardElectionTimeoutFactor() {
241         return raftConfig.getElectionTimeoutFactor();
242     }
243
244     public String getDataStoreName() {
245         return dataStoreName;
246     }
247
248     public LogicalDatastoreType getLogicalStoreType() {
249         return logicalStoreType;
250     }
251
252     public YangInstanceIdentifier getStoreRoot() {
253         return storeRoot;
254     }
255
256     public long getTransactionCreationInitialRateLimit() {
257         return transactionCreationInitialRateLimit;
258     }
259
260     public String getShardManagerPersistenceId() {
261         return shardManagerPersistenceId;
262     }
263
264     @Override
265     public String getTempFileDirectory() {
266         return raftConfig.getTempFileDirectory();
267     }
268
269     private void setTempFileDirectory(final String tempFileDirectory) {
270         raftConfig.setTempFileDirectory(tempFileDirectory);
271     }
272
273     @Override
274     public int getFileBackedStreamingThreshold() {
275         return raftConfig.getFileBackedStreamingThreshold();
276     }
277
278     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
279         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
280     }
281
282     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
283         raftConfig.setPeerAddressResolver(resolver);
284     }
285
286     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
287         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
288                 TimeUnit.MILLISECONDS));
289     }
290
291     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
292         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
293     }
294
295
296     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
297         raftConfig.setIsolatedLeaderCheckInterval(
298                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
299     }
300
301     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
302         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
303     }
304
305     private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
306         raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
307     }
308
309     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
310         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
311     }
312
313     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
314         checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
315         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
316     }
317
318     private void setSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
319         checkArgument(shardSnapshotDataThreshold >= 0);
320         raftConfig.setSnapshotDataThreshold(shardSnapshotDataThreshold);
321     }
322
323     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
324         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
325     }
326
327     /**
328      * Set the interval in seconds after which a snapshot should be taken during the recovery process.
329      * 0 means don't take snapshots
330      */
331     private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
332         raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
333     }
334
335     @Deprecated
336     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
337         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
338         // maximumMessageSliceSize.
339         if (shardSnapshotChunkSize < maximumMessageSliceSize) {
340             raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
341         }
342     }
343
344     private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
345         raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
346         this.maximumMessageSliceSize = maximumMessageSliceSize;
347     }
348
349     private void setSyncIndexThreshold(final long syncIndexThreshold) {
350         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
351     }
352
353     public int getShardBatchedModificationCount() {
354         return shardBatchedModificationCount;
355     }
356
357     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
358         return writeOnlyTransactionOptimizationsEnabled;
359     }
360
361     public long getShardCommitQueueExpiryTimeoutInMillis() {
362         return shardCommitQueueExpiryTimeoutInMillis;
363     }
364
365     public boolean isTransactionDebugContextEnabled() {
366         return transactionDebugContextEnabled;
367     }
368
369     @Deprecated(since = "7.0.0", forRemoval = true)
370     public boolean isUseTellBasedProtocol() {
371         return useTellBasedProtocol;
372     }
373
374     public boolean isUseLz4Compression() {
375         return useLz4Compression;
376     }
377
378     public ExportOnRecovery getExportOnRecovery() {
379         return exportOnRecovery;
380     }
381
382     public String getRecoveryExportBaseDir() {
383         return recoveryExportBaseDir;
384     }
385
386     @Override
387     public int getMaximumMessageSliceSize() {
388         return maximumMessageSliceSize;
389     }
390
391     @Override
392     public long getBackendAlivenessTimerInterval() {
393         return backendAlivenessTimerInterval;
394     }
395
396     @Override
397     public long getRequestTimeout() {
398         return requestTimeout;
399     }
400
401     @Override
402     public long getNoProgressTimeout() {
403         return noProgressTimeout;
404     }
405
406     public int getInitialPayloadSerializedBufferCapacity() {
407         return initialPayloadSerializedBufferCapacity;
408     }
409
410     public static class Builder {
411         private final DatastoreContext datastoreContext;
412
413         Builder(final DatastoreContext datastoreContext) {
414             this.datastoreContext = datastoreContext;
415         }
416
417         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
418             // TODO - this is defined in the yang DataStoreProperties but not currently used.
419             return this;
420         }
421
422         public Builder enableMetricCapture(final boolean enableMetricCapture) {
423             // TODO - this is defined in the yang DataStoreProperties but not currently used.
424             return this;
425         }
426
427
428         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
429             datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
430             return this;
431         }
432
433         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
434             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
435         }
436
437         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
438             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
439             return this;
440         }
441
442         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
443             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
444             return this;
445         }
446
447         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
448             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
449             return this;
450         }
451
452         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
453             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
454             return this;
455         }
456
457         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
458             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
459             return this;
460         }
461
462         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
463             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
464             return this;
465         }
466
467         public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
468             checkArgument(recoverySnapshotIntervalSeconds >= 0);
469             datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
470             return this;
471         }
472
473         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
474             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
475             return this;
476         }
477
478         public Builder shardSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
479             datastoreContext.setSnapshotDataThreshold(shardSnapshotDataThreshold);
480             return this;
481         }
482
483         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
484             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
485             return this;
486         }
487
488         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
489             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
490             return this;
491         }
492
493         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
494             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
495             return this;
496         }
497
498         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
499             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
500         }
501
502         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
503             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
504             return this;
505         }
506
507         public Builder initialSettleTimeoutMultiplier(final int multiplier) {
508             checkArgument(multiplier >= 0);
509             datastoreContext.initialSettleTimeoutMultiplier = multiplier;
510             return this;
511         }
512
513         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
514             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
515         }
516
517         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
518             datastoreContext.configurationReader = configurationReader;
519             return this;
520         }
521
522         public Builder persistent(final boolean persistent) {
523             datastoreContext.persistent = persistent;
524             return this;
525         }
526
527         public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) {
528             datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite;
529             return this;
530         }
531
532         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
533             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
534             return this;
535         }
536
537         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
538             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
539             return this;
540         }
541
542         public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
543             datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
544             return this;
545         }
546
547         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
548             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
549             return this;
550         }
551
552         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
553             datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
554
555             // Retain compatible naming
556             switch (logicalStoreType) {
557                 case CONFIGURATION:
558                     dataStoreName("config");
559                     break;
560                 case OPERATIONAL:
561                     dataStoreName("operational");
562                     break;
563                 default:
564                     dataStoreName(logicalStoreType.name());
565             }
566
567             return this;
568         }
569
570         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
571             datastoreContext.storeRoot = storeRoot;
572             return this;
573         }
574
575         public Builder dataStoreName(final String dataStoreName) {
576             datastoreContext.dataStoreName = requireNonNull(dataStoreName);
577             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
578             return this;
579         }
580
581         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
582             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
583             return this;
584         }
585
586         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
587             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
588             return this;
589         }
590
591         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
592             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
593             return this;
594         }
595
596         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
597             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
598                     value, TimeUnit.SECONDS);
599             return this;
600         }
601
602         public Builder transactionDebugContextEnabled(final boolean value) {
603             datastoreContext.transactionDebugContextEnabled = value;
604             return this;
605         }
606
607         @Deprecated(since = "7.0.0", forRemoval = true)
608         public Builder useTellBasedProtocol(final boolean value) {
609             datastoreContext.useTellBasedProtocol = value;
610             return this;
611         }
612
613         public Builder useLz4Compression(final boolean value) {
614             datastoreContext.useLz4Compression = value;
615             return this;
616         }
617
618         public Builder exportOnRecovery(final ExportOnRecovery value) {
619             datastoreContext.exportOnRecovery = value;
620             return this;
621         }
622
623         public Builder recoveryExportBaseDir(final String value) {
624             datastoreContext.recoveryExportBaseDir = value;
625             return this;
626         }
627
628         /**
629          * For unit tests only.
630          */
631         @VisibleForTesting
632         public Builder shardManagerPersistenceId(final String id) {
633             datastoreContext.shardManagerPersistenceId = id;
634             return this;
635         }
636
637         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
638             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
639             return this;
640         }
641
642         @Deprecated
643         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
644             LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
645                     + "use maximum-message-slice-size instead");
646             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
647             return this;
648         }
649
650         public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
651             datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
652             return this;
653         }
654
655         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
656             datastoreContext.setPeerAddressResolver(resolver);
657             return this;
658         }
659
660         public Builder tempFileDirectory(final String tempFileDirectory) {
661             datastoreContext.setTempFileDirectory(tempFileDirectory);
662             return this;
663         }
664
665         public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
666             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
667             return this;
668         }
669
670         public Builder syncIndexThreshold(final long syncIndexThreshold) {
671             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
672             return this;
673         }
674
675         public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
676             datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
677             return this;
678         }
679
680         public Builder frontendRequestTimeoutInSeconds(final long timeout) {
681             datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
682             return this;
683         }
684
685         public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
686             datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
687             return this;
688         }
689
690         public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
691             datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
692             return this;
693         }
694
695         public DatastoreContext build() {
696             if (datastoreContext.dataStoreName != null) {
697                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
698             }
699
700             return datastoreContext;
701         }
702     }
703 }