Speed up DatastoreContextIntrospector a bit
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.TimeUnit;
18 import org.apache.commons.text.WordUtils;
19 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
21 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
22 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
23 import org.opendaylight.controller.cluster.raft.ConfigParams;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.duration.FiniteDuration;
31
32 /**
33  * Contains contextual data for a data store.
34  *
35  * @author Thomas Pantelis
36  */
37 // Non-final for mocking
38 public class DatastoreContext implements ClientActorConfig {
39     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
40
41     public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
42         TimeUnit.MINUTES);
43     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
44     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
45     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
46     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
47     public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
48     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
49     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
50             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
51     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
52     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
53     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
54     public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
55     public static final boolean DEFAULT_PERSISTENT = true;
56     public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
57     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
58     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
59     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
60     public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
61     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
62     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
63     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
64     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
65             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
66     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
67     public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
68
69     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
70
71     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
72
73     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
74
75     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
76
77     private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
78     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
79     private String dataStoreMXBeanType;
80     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
81     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
82     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
83     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
84     private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
85     private boolean persistent = DEFAULT_PERSISTENT;
86     private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE;
87     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
88     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
89     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
90     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
91     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty();
92     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
93     private boolean writeOnlyTransactionOptimizationsEnabled = true;
94     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
95     private boolean useTellBasedProtocol = false;
96     private boolean transactionDebugContextEnabled = false;
97     private String shardManagerPersistenceId;
98     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
99     private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
100     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
101     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
102     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
103
104     public static Set<String> getGlobalDatastoreNames() {
105         return GLOBAL_DATASTORE_NAMES;
106     }
107
108     DatastoreContext() {
109         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
110         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
111         setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
112         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
113         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
114         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
115         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
116         setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
117         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
118         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
119     }
120
121     private DatastoreContext(final DatastoreContext other) {
122         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
123         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
124         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
125         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
126         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
127         this.shardInitializationTimeout = other.shardInitializationTimeout;
128         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
129         this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
130         this.persistent = other.persistent;
131         this.snapshotOnRootOverwrite = other.snapshotOnRootOverwrite;
132         this.configurationReader = other.configurationReader;
133         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
134         this.dataStoreName = other.dataStoreName;
135         this.logicalStoreType = other.logicalStoreType;
136         this.storeRoot = other.storeRoot;
137         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
138         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
139         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
140         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
141         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
142         this.useTellBasedProtocol = other.useTellBasedProtocol;
143         this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
144         this.requestTimeout = other.requestTimeout;
145         this.noProgressTimeout = other.noProgressTimeout;
146         this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
147
148         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
149         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
150         setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
151         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
152         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
153         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
154         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
155         setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
156         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
157         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
158         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
159         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
160         setTempFileDirectory(other.getTempFileDirectory());
161         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
162         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
163     }
164
165     public static Builder newBuilder() {
166         return new Builder(new DatastoreContext());
167     }
168
169     public static Builder newBuilderFrom(final DatastoreContext context) {
170         return new Builder(new DatastoreContext(context));
171     }
172
173     public FiniteDuration getShardTransactionIdleTimeout() {
174         return shardTransactionIdleTimeout;
175     }
176
177     public String getDataStoreMXBeanType() {
178         return dataStoreMXBeanType;
179     }
180
181     public long getOperationTimeoutInMillis() {
182         return operationTimeoutInMillis;
183     }
184
185     public ConfigParams getShardRaftConfig() {
186         return raftConfig;
187     }
188
189     public int getShardTransactionCommitTimeoutInSeconds() {
190         return shardTransactionCommitTimeoutInSeconds;
191     }
192
193     public int getShardTransactionCommitQueueCapacity() {
194         return shardTransactionCommitQueueCapacity;
195     }
196
197     public Timeout getShardInitializationTimeout() {
198         return shardInitializationTimeout;
199     }
200
201     public Timeout getShardLeaderElectionTimeout() {
202         return shardLeaderElectionTimeout;
203     }
204
205     /**
206      * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
207      * on the local node to settle.
208      *
209      * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
210      */
211     public int getInitialSettleTimeoutMultiplier() {
212         return initialSettleTimeoutMultiplier;
213     }
214
215     public boolean isPersistent() {
216         return persistent;
217     }
218
219     public boolean isSnapshotOnRootOverwrite() {
220         return this.snapshotOnRootOverwrite;
221     }
222
223     public AkkaConfigurationReader getConfigurationReader() {
224         return configurationReader;
225     }
226
227     public long getShardElectionTimeoutFactor() {
228         return raftConfig.getElectionTimeoutFactor();
229     }
230
231     public String getDataStoreName() {
232         return dataStoreName;
233     }
234
235     public LogicalDatastoreType getLogicalStoreType() {
236         return logicalStoreType;
237     }
238
239     public YangInstanceIdentifier getStoreRoot() {
240         return storeRoot;
241     }
242
243     public long getTransactionCreationInitialRateLimit() {
244         return transactionCreationInitialRateLimit;
245     }
246
247     public String getShardManagerPersistenceId() {
248         return shardManagerPersistenceId;
249     }
250
251     @Override
252     public String getTempFileDirectory() {
253         return raftConfig.getTempFileDirectory();
254     }
255
256     private void setTempFileDirectory(final String tempFileDirectory) {
257         raftConfig.setTempFileDirectory(tempFileDirectory);
258     }
259
260     @Override
261     public int getFileBackedStreamingThreshold() {
262         return raftConfig.getFileBackedStreamingThreshold();
263     }
264
265     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
266         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
267     }
268
269     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
270         raftConfig.setPeerAddressResolver(resolver);
271     }
272
273     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
274         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
275                 TimeUnit.MILLISECONDS));
276     }
277
278     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
279         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
280     }
281
282
283     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
284         raftConfig.setIsolatedLeaderCheckInterval(
285                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
286     }
287
288     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
289         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
290     }
291
292     private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
293         raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
294     }
295
296     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
297         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
298     }
299
300     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
301         checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
302         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
303     }
304
305     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
306         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
307     }
308
309     /**
310      * Set the interval in seconds after which a snapshot should be taken during the recovery process.
311      * 0 means don't take snapshots
312      */
313     private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
314         raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
315     }
316
317     @Deprecated
318     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
319         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
320         // maximumMessageSliceSize.
321         if (shardSnapshotChunkSize < maximumMessageSliceSize) {
322             raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
323         }
324     }
325
326     private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
327         raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
328         this.maximumMessageSliceSize = maximumMessageSliceSize;
329     }
330
331     private void setSyncIndexThreshold(final long syncIndexThreshold) {
332         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
333     }
334
335     public int getShardBatchedModificationCount() {
336         return shardBatchedModificationCount;
337     }
338
339     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
340         return writeOnlyTransactionOptimizationsEnabled;
341     }
342
343     public long getShardCommitQueueExpiryTimeoutInMillis() {
344         return shardCommitQueueExpiryTimeoutInMillis;
345     }
346
347     public boolean isTransactionDebugContextEnabled() {
348         return transactionDebugContextEnabled;
349     }
350
351     public boolean isUseTellBasedProtocol() {
352         return useTellBasedProtocol;
353     }
354
355     @Override
356     public int getMaximumMessageSliceSize() {
357         return maximumMessageSliceSize;
358     }
359
360     @Override
361     public long getBackendAlivenessTimerInterval() {
362         return backendAlivenessTimerInterval;
363     }
364
365     @Override
366     public long getRequestTimeout() {
367         return requestTimeout;
368     }
369
370     @Override
371     public long getNoProgressTimeout() {
372         return noProgressTimeout;
373     }
374
375     public int getInitialPayloadSerializedBufferCapacity() {
376         return initialPayloadSerializedBufferCapacity;
377     }
378
379     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
380         private final DatastoreContext datastoreContext;
381
382         Builder(final DatastoreContext datastoreContext) {
383             this.datastoreContext = datastoreContext;
384         }
385
386         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
387             // TODO - this is defined in the yang DataStoreProperties but not currently used.
388             return this;
389         }
390
391         public Builder enableMetricCapture(final boolean enableMetricCapture) {
392             // TODO - this is defined in the yang DataStoreProperties but not currently used.
393             return this;
394         }
395
396
397         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
398             datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
399             return this;
400         }
401
402         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
403             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
404         }
405
406         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
407             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
408             return this;
409         }
410
411         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
412             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
413             return this;
414         }
415
416         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
417             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
418             return this;
419         }
420
421         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
422             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
423             return this;
424         }
425
426         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
427             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
428             return this;
429         }
430
431         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
432             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
433             return this;
434         }
435
436         public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
437             checkArgument(recoverySnapshotIntervalSeconds >= 0);
438             datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
439             return this;
440         }
441
442         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
443             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
444             return this;
445         }
446
447         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
448             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
449             return this;
450         }
451
452         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
453             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
454             return this;
455         }
456
457         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
458             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
459             return this;
460         }
461
462         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
463             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
464         }
465
466         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
467             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
468             return this;
469         }
470
471         public Builder initialSettleTimeoutMultiplier(final int multiplier) {
472             checkArgument(multiplier >= 0);
473             datastoreContext.initialSettleTimeoutMultiplier = multiplier;
474             return this;
475         }
476
477         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
478             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
479         }
480
481         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
482             datastoreContext.configurationReader = configurationReader;
483             return this;
484         }
485
486         public Builder persistent(final boolean persistent) {
487             datastoreContext.persistent = persistent;
488             return this;
489         }
490
491         public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) {
492             datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite;
493             return this;
494         }
495
496         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
497             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
498             return this;
499         }
500
501         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
502             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
503             return this;
504         }
505
506         public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
507             datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
508             return this;
509         }
510
511         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
512             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
513             return this;
514         }
515
516         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
517             datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
518
519             // Retain compatible naming
520             switch (logicalStoreType) {
521                 case CONFIGURATION:
522                     dataStoreName("config");
523                     break;
524                 case OPERATIONAL:
525                     dataStoreName("operational");
526                     break;
527                 default:
528                     dataStoreName(logicalStoreType.name());
529             }
530
531             return this;
532         }
533
534         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
535             datastoreContext.storeRoot = storeRoot;
536             return this;
537         }
538
539         public Builder dataStoreName(final String dataStoreName) {
540             datastoreContext.dataStoreName = requireNonNull(dataStoreName);
541             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
542             return this;
543         }
544
545         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
546             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
547             return this;
548         }
549
550         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
551             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
552             return this;
553         }
554
555         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
556             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
557             return this;
558         }
559
560         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
561             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
562                     value, TimeUnit.SECONDS);
563             return this;
564         }
565
566         public Builder transactionDebugContextEnabled(final boolean value) {
567             datastoreContext.transactionDebugContextEnabled = value;
568             return this;
569         }
570
571         @Deprecated(forRemoval = true)
572         public Builder maxShardDataChangeExecutorPoolSize(final int newMaxShardDataChangeExecutorPoolSize) {
573             return this;
574         }
575
576         @Deprecated(forRemoval = true)
577         public Builder maxShardDataChangeExecutorQueueSize(final int newMaxShardDataChangeExecutorQueueSize) {
578             return this;
579         }
580
581         @Deprecated(forRemoval = true)
582         public Builder maxShardDataChangeListenerQueueSize(final int newMaxShardDataChangeListenerQueueSize) {
583             return this;
584         }
585
586         @Deprecated(forRemoval = true)
587         public Builder maxShardDataStoreExecutorQueueSize(final int newMaxShardDataStoreExecutorQueueSize) {
588             return this;
589         }
590
591         public Builder useTellBasedProtocol(final boolean value) {
592             datastoreContext.useTellBasedProtocol = value;
593             return this;
594         }
595
596         /**
597          * For unit tests only.
598          */
599         @VisibleForTesting
600         public Builder shardManagerPersistenceId(final String id) {
601             datastoreContext.shardManagerPersistenceId = id;
602             return this;
603         }
604
605         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
606             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
607             return this;
608         }
609
610         @Deprecated
611         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
612             LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
613                     + "use maximum-message-slice-size instead");
614             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
615             return this;
616         }
617
618         public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
619             datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
620             return this;
621         }
622
623         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
624             datastoreContext.setPeerAddressResolver(resolver);
625             return this;
626         }
627
628         public Builder tempFileDirectory(final String tempFileDirectory) {
629             datastoreContext.setTempFileDirectory(tempFileDirectory);
630             return this;
631         }
632
633         public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
634             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
635             return this;
636         }
637
638         public Builder syncIndexThreshold(final long syncIndexThreshold) {
639             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
640             return this;
641         }
642
643         public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
644             datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
645             return this;
646         }
647
648         public Builder frontendRequestTimeoutInSeconds(final long timeout) {
649             datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
650             return this;
651         }
652
653         public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
654             datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
655             return this;
656         }
657
658         public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
659             datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
660             return this;
661         }
662
663         @Override
664         public DatastoreContext build() {
665             if (datastoreContext.dataStoreName != null) {
666                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
667             }
668
669             return datastoreContext;
670         }
671     }
672 }