24b37751272b68741ab9e296b835244e6b337944
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.TimeUnit;
18 import org.apache.commons.text.WordUtils;
19 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
21 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
22 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
23 import org.opendaylight.controller.cluster.raft.ConfigParams;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.FiniteDuration;
32
33 /**
34  * Contains contextual data for a data store.
35  *
36  * @author Thomas Pantelis
37  */
38 // Non-final for mocking
39 public class DatastoreContext implements ClientActorConfig {
40     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
41
42     public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
43         TimeUnit.MINUTES);
44     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
45     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
46     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
47     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
48     public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
49     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
50     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
51             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
52     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
53     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
54     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
55     public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
56     public static final boolean DEFAULT_PERSISTENT = true;
57     public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
58     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
59     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
60     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD = 0;
61     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
62     public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
63     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
64     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
65     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
66     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
67             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
68     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 480 * 1024; // 480KiB
69     public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
70     public static final ExportOnRecovery DEFAULT_EXPORT_ON_RECOVERY = ExportOnRecovery.Off;
71     public static final String DEFAULT_RECOVERY_EXPORT_BASE_DIR = "persistence-export";
72
73     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
74
75     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
76
77     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
78
79     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
80
81     private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
82     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
83     private String dataStoreMXBeanType;
84     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
85     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
86     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
87     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
88     private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
89     private boolean persistent = DEFAULT_PERSISTENT;
90     private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE;
91     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
92     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
93     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
94     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
95     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.of();
96     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
97     private boolean writeOnlyTransactionOptimizationsEnabled = true;
98     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
99     private boolean transactionDebugContextEnabled = false;
100     private String shardManagerPersistenceId;
101     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
102     private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
103     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
104     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
105     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
106     private boolean useLz4Compression = false;
107     private ExportOnRecovery exportOnRecovery = DEFAULT_EXPORT_ON_RECOVERY;
108     private String recoveryExportBaseDir = DEFAULT_RECOVERY_EXPORT_BASE_DIR;
109
110     public static Set<String> getGlobalDatastoreNames() {
111         return GLOBAL_DATASTORE_NAMES;
112     }
113
114     DatastoreContext() {
115         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
116         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
117         setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
118         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
119         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
120         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
121         setSnapshotDataThreshold(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD);
122         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
123         setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
124         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
125         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
126     }
127
128     private DatastoreContext(final DatastoreContext other) {
129         shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
130         operationTimeoutInMillis = other.operationTimeoutInMillis;
131         dataStoreMXBeanType = other.dataStoreMXBeanType;
132         shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
133         shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
134         shardInitializationTimeout = other.shardInitializationTimeout;
135         shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
136         initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
137         persistent = other.persistent;
138         snapshotOnRootOverwrite = other.snapshotOnRootOverwrite;
139         configurationReader = other.configurationReader;
140         transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
141         dataStoreName = other.dataStoreName;
142         logicalStoreType = other.logicalStoreType;
143         storeRoot = other.storeRoot;
144         shardBatchedModificationCount = other.shardBatchedModificationCount;
145         writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
146         shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
147         transactionDebugContextEnabled = other.transactionDebugContextEnabled;
148         shardManagerPersistenceId = other.shardManagerPersistenceId;
149         backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
150         requestTimeout = other.requestTimeout;
151         noProgressTimeout = other.noProgressTimeout;
152         initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
153         useLz4Compression = other.useLz4Compression;
154         exportOnRecovery = other.exportOnRecovery;
155         recoveryExportBaseDir = other.recoveryExportBaseDir;
156
157         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
158         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
159         setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
160         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
161         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
162         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
163         setSnapshotDataThreshold(other.raftConfig.getSnapshotDataThreshold());
164         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
165         setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
166         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
167         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
168         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
169         setTempFileDirectory(other.getTempFileDirectory());
170         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
171         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
172     }
173
174     public static Builder newBuilder() {
175         return new Builder(new DatastoreContext());
176     }
177
178     public static Builder newBuilderFrom(final DatastoreContext context) {
179         return new Builder(new DatastoreContext(context));
180     }
181
182     public FiniteDuration getShardTransactionIdleTimeout() {
183         return shardTransactionIdleTimeout;
184     }
185
186     public String getDataStoreMXBeanType() {
187         return dataStoreMXBeanType;
188     }
189
190     public long getOperationTimeoutInMillis() {
191         return operationTimeoutInMillis;
192     }
193
194     public ConfigParams getShardRaftConfig() {
195         return raftConfig;
196     }
197
198     public int getShardTransactionCommitTimeoutInSeconds() {
199         return shardTransactionCommitTimeoutInSeconds;
200     }
201
202     public int getShardTransactionCommitQueueCapacity() {
203         return shardTransactionCommitQueueCapacity;
204     }
205
206     public Timeout getShardInitializationTimeout() {
207         return shardInitializationTimeout;
208     }
209
210     public Timeout getShardLeaderElectionTimeout() {
211         return shardLeaderElectionTimeout;
212     }
213
214     /**
215      * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
216      * on the local node to settle.
217      *
218      * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
219      */
220     public int getInitialSettleTimeoutMultiplier() {
221         return initialSettleTimeoutMultiplier;
222     }
223
224     public boolean isPersistent() {
225         return persistent;
226     }
227
228     public boolean isSnapshotOnRootOverwrite() {
229         return snapshotOnRootOverwrite;
230     }
231
232     public AkkaConfigurationReader getConfigurationReader() {
233         return configurationReader;
234     }
235
236     public long getShardElectionTimeoutFactor() {
237         return raftConfig.getElectionTimeoutFactor();
238     }
239
240     public String getDataStoreName() {
241         return dataStoreName;
242     }
243
244     public LogicalDatastoreType getLogicalStoreType() {
245         return logicalStoreType;
246     }
247
248     public YangInstanceIdentifier getStoreRoot() {
249         return storeRoot;
250     }
251
252     public long getTransactionCreationInitialRateLimit() {
253         return transactionCreationInitialRateLimit;
254     }
255
256     public String getShardManagerPersistenceId() {
257         return shardManagerPersistenceId;
258     }
259
260     @Override
261     public String getTempFileDirectory() {
262         return raftConfig.getTempFileDirectory();
263     }
264
265     private void setTempFileDirectory(final String tempFileDirectory) {
266         raftConfig.setTempFileDirectory(tempFileDirectory);
267     }
268
269     @Override
270     public int getFileBackedStreamingThreshold() {
271         return raftConfig.getFileBackedStreamingThreshold();
272     }
273
274     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
275         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
276     }
277
278     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
279         raftConfig.setPeerAddressResolver(resolver);
280     }
281
282     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
283         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
284                 TimeUnit.MILLISECONDS));
285     }
286
287     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
288         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
289     }
290
291
292     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
293         raftConfig.setIsolatedLeaderCheckInterval(
294                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
295     }
296
297     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
298         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
299     }
300
301     private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
302         raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
303     }
304
305     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
306         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
307     }
308
309     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
310         checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
311         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
312     }
313
314     private void setSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
315         checkArgument(shardSnapshotDataThreshold >= 0);
316         raftConfig.setSnapshotDataThreshold(shardSnapshotDataThreshold);
317     }
318
319     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
320         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
321     }
322
323     /**
324      * Set the interval in seconds after which a snapshot should be taken during the recovery process.
325      * 0 means don't take snapshots
326      */
327     private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
328         raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
329     }
330
331     private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
332         raftConfig.setMaximumMessageSliceSize(maximumMessageSliceSize);
333         this.maximumMessageSliceSize = maximumMessageSliceSize;
334     }
335
336     private void setSyncIndexThreshold(final long syncIndexThreshold) {
337         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
338     }
339
340     public int getShardBatchedModificationCount() {
341         return shardBatchedModificationCount;
342     }
343
344     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
345         return writeOnlyTransactionOptimizationsEnabled;
346     }
347
348     public long getShardCommitQueueExpiryTimeoutInMillis() {
349         return shardCommitQueueExpiryTimeoutInMillis;
350     }
351
352     public boolean isTransactionDebugContextEnabled() {
353         return transactionDebugContextEnabled;
354     }
355
356     public boolean isUseLz4Compression() {
357         return useLz4Compression;
358     }
359
360     public ExportOnRecovery getExportOnRecovery() {
361         return exportOnRecovery;
362     }
363
364     public String getRecoveryExportBaseDir() {
365         return recoveryExportBaseDir;
366     }
367
368     @Override
369     public int getMaximumMessageSliceSize() {
370         return maximumMessageSliceSize;
371     }
372
373     @Override
374     public long getBackendAlivenessTimerInterval() {
375         return backendAlivenessTimerInterval;
376     }
377
378     @Override
379     public long getRequestTimeout() {
380         return requestTimeout;
381     }
382
383     @Override
384     public long getNoProgressTimeout() {
385         return noProgressTimeout;
386     }
387
388     public int getInitialPayloadSerializedBufferCapacity() {
389         return initialPayloadSerializedBufferCapacity;
390     }
391
392     public static class Builder {
393         private final DatastoreContext datastoreContext;
394
395         Builder(final DatastoreContext datastoreContext) {
396             this.datastoreContext = datastoreContext;
397         }
398
399         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
400             // TODO - this is defined in the yang DataStoreProperties but not currently used.
401             return this;
402         }
403
404         public Builder enableMetricCapture(final boolean enableMetricCapture) {
405             // TODO - this is defined in the yang DataStoreProperties but not currently used.
406             return this;
407         }
408
409
410         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
411             datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
412             return this;
413         }
414
415         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
416             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
417         }
418
419         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
420             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
421             return this;
422         }
423
424         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
425             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
426             return this;
427         }
428
429         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
430             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
431             return this;
432         }
433
434         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
435             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
436             return this;
437         }
438
439         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
440             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
441             return this;
442         }
443
444         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
445             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
446             return this;
447         }
448
449         public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
450             checkArgument(recoverySnapshotIntervalSeconds >= 0);
451             datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
452             return this;
453         }
454
455         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
456             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
457             return this;
458         }
459
460         public Builder shardSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
461             datastoreContext.setSnapshotDataThreshold(shardSnapshotDataThreshold);
462             return this;
463         }
464
465         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
466             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
467             return this;
468         }
469
470         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
471             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
472             return this;
473         }
474
475         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
476             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
477             return this;
478         }
479
480         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
481             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
482         }
483
484         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
485             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
486             return this;
487         }
488
489         public Builder initialSettleTimeoutMultiplier(final int multiplier) {
490             checkArgument(multiplier >= 0);
491             datastoreContext.initialSettleTimeoutMultiplier = multiplier;
492             return this;
493         }
494
495         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
496             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
497         }
498
499         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
500             datastoreContext.configurationReader = configurationReader;
501             return this;
502         }
503
504         public Builder persistent(final boolean persistent) {
505             datastoreContext.persistent = persistent;
506             return this;
507         }
508
509         public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) {
510             datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite;
511             return this;
512         }
513
514         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
515             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
516             return this;
517         }
518
519         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
520             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
521             return this;
522         }
523
524         public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
525             datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
526             return this;
527         }
528
529         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
530             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
531             return this;
532         }
533
534         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
535             datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
536
537             // Retain compatible naming
538             switch (logicalStoreType) {
539                 case CONFIGURATION:
540                     dataStoreName("config");
541                     break;
542                 case OPERATIONAL:
543                     dataStoreName("operational");
544                     break;
545                 default:
546                     dataStoreName(logicalStoreType.name());
547             }
548
549             return this;
550         }
551
552         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
553             datastoreContext.storeRoot = storeRoot;
554             return this;
555         }
556
557         public Builder dataStoreName(final String dataStoreName) {
558             datastoreContext.dataStoreName = requireNonNull(dataStoreName);
559             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
560             return this;
561         }
562
563         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
564             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
565             return this;
566         }
567
568         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
569             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
570             return this;
571         }
572
573         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
574             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
575             return this;
576         }
577
578         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
579             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
580                     value, TimeUnit.SECONDS);
581             return this;
582         }
583
584         public Builder transactionDebugContextEnabled(final boolean value) {
585             datastoreContext.transactionDebugContextEnabled = value;
586             return this;
587         }
588
589         public Builder useLz4Compression(final boolean value) {
590             datastoreContext.useLz4Compression = value;
591             return this;
592         }
593
594         public Builder exportOnRecovery(final ExportOnRecovery value) {
595             datastoreContext.exportOnRecovery = value;
596             return this;
597         }
598
599         public Builder recoveryExportBaseDir(final String value) {
600             datastoreContext.recoveryExportBaseDir = value;
601             return this;
602         }
603
604         /**
605          * For unit tests only.
606          */
607         @VisibleForTesting
608         public Builder shardManagerPersistenceId(final String id) {
609             datastoreContext.shardManagerPersistenceId = id;
610             return this;
611         }
612
613         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
614             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
615             return this;
616         }
617
618         public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
619             datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
620             return this;
621         }
622
623         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
624             datastoreContext.setPeerAddressResolver(resolver);
625             return this;
626         }
627
628         public Builder tempFileDirectory(final String tempFileDirectory) {
629             datastoreContext.setTempFileDirectory(tempFileDirectory);
630             return this;
631         }
632
633         public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
634             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
635             return this;
636         }
637
638         public Builder syncIndexThreshold(final long syncIndexThreshold) {
639             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
640             return this;
641         }
642
643         public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
644             datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
645             return this;
646         }
647
648         public Builder frontendRequestTimeoutInSeconds(final long timeout) {
649             datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
650             return this;
651         }
652
653         public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
654             datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
655             return this;
656         }
657
658         public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
659             datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
660             return this;
661         }
662
663         public DatastoreContext build() {
664             if (datastoreContext.dataStoreName != null) {
665                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
666             }
667
668             return datastoreContext;
669         }
670     }
671 }