Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.TimeUnit;
18 import org.apache.commons.text.WordUtils;
19 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
21 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
22 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
23 import org.opendaylight.controller.cluster.raft.ConfigParams;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.duration.FiniteDuration;
31
32 /**
33  * Contains contextual data for a data store.
34  *
35  * @author Thomas Pantelis
36  */
37 // Non-final for mocking
38 public class DatastoreContext implements ClientActorConfig {
39     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
40
41     public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
42         TimeUnit.MINUTES);
43     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
44     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
45     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
46     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
47     public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
48     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
49     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
50             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
51     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
52     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
53     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
54     public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
55     public static final boolean DEFAULT_PERSISTENT = true;
56     public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
57     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
58     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
59     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD = 0;
60     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
61     public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
62     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
63     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
64     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
65     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
66             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
67     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
68     public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
69
70     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
71
72     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
73
74     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
75
76     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
77
78     private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
79     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
80     private String dataStoreMXBeanType;
81     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
82     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
83     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
84     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
85     private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
86     private boolean persistent = DEFAULT_PERSISTENT;
87     private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE;
88     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
89     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
90     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
91     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
92     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty();
93     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
94     private boolean writeOnlyTransactionOptimizationsEnabled = true;
95     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
96     private boolean useTellBasedProtocol = false;
97     private boolean transactionDebugContextEnabled = false;
98     private String shardManagerPersistenceId;
99     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
100     private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
101     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
102     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
103     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
104     private boolean useLz4Compression = false;
105
106     public static Set<String> getGlobalDatastoreNames() {
107         return GLOBAL_DATASTORE_NAMES;
108     }
109
110     DatastoreContext() {
111         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
112         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
113         setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
114         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
115         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
116         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
117         setSnapshotDataThreshold(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD);
118         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
119         setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
120         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
121         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
122     }
123
124     private DatastoreContext(final DatastoreContext other) {
125         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
126         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
127         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
128         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
129         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
130         this.shardInitializationTimeout = other.shardInitializationTimeout;
131         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
132         this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
133         this.persistent = other.persistent;
134         this.snapshotOnRootOverwrite = other.snapshotOnRootOverwrite;
135         this.configurationReader = other.configurationReader;
136         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
137         this.dataStoreName = other.dataStoreName;
138         this.logicalStoreType = other.logicalStoreType;
139         this.storeRoot = other.storeRoot;
140         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
141         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
142         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
143         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
144         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
145         this.useTellBasedProtocol = other.useTellBasedProtocol;
146         this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
147         this.requestTimeout = other.requestTimeout;
148         this.noProgressTimeout = other.noProgressTimeout;
149         this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
150         this.useLz4Compression = other.useLz4Compression;
151
152         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
153         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
154         setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
155         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
156         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
157         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
158         setSnapshotDataThreshold(other.raftConfig.getSnapshotDataThreshold());
159         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
160         setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
161         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
162         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
163         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
164         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
165         setTempFileDirectory(other.getTempFileDirectory());
166         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
167         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
168     }
169
170     public static Builder newBuilder() {
171         return new Builder(new DatastoreContext());
172     }
173
174     public static Builder newBuilderFrom(final DatastoreContext context) {
175         return new Builder(new DatastoreContext(context));
176     }
177
178     public FiniteDuration getShardTransactionIdleTimeout() {
179         return shardTransactionIdleTimeout;
180     }
181
182     public String getDataStoreMXBeanType() {
183         return dataStoreMXBeanType;
184     }
185
186     public long getOperationTimeoutInMillis() {
187         return operationTimeoutInMillis;
188     }
189
190     public ConfigParams getShardRaftConfig() {
191         return raftConfig;
192     }
193
194     public int getShardTransactionCommitTimeoutInSeconds() {
195         return shardTransactionCommitTimeoutInSeconds;
196     }
197
198     public int getShardTransactionCommitQueueCapacity() {
199         return shardTransactionCommitQueueCapacity;
200     }
201
202     public Timeout getShardInitializationTimeout() {
203         return shardInitializationTimeout;
204     }
205
206     public Timeout getShardLeaderElectionTimeout() {
207         return shardLeaderElectionTimeout;
208     }
209
210     /**
211      * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
212      * on the local node to settle.
213      *
214      * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
215      */
216     public int getInitialSettleTimeoutMultiplier() {
217         return initialSettleTimeoutMultiplier;
218     }
219
220     public boolean isPersistent() {
221         return persistent;
222     }
223
224     public boolean isSnapshotOnRootOverwrite() {
225         return this.snapshotOnRootOverwrite;
226     }
227
228     public AkkaConfigurationReader getConfigurationReader() {
229         return configurationReader;
230     }
231
232     public long getShardElectionTimeoutFactor() {
233         return raftConfig.getElectionTimeoutFactor();
234     }
235
236     public String getDataStoreName() {
237         return dataStoreName;
238     }
239
240     public LogicalDatastoreType getLogicalStoreType() {
241         return logicalStoreType;
242     }
243
244     public YangInstanceIdentifier getStoreRoot() {
245         return storeRoot;
246     }
247
248     public long getTransactionCreationInitialRateLimit() {
249         return transactionCreationInitialRateLimit;
250     }
251
252     public String getShardManagerPersistenceId() {
253         return shardManagerPersistenceId;
254     }
255
256     @Override
257     public String getTempFileDirectory() {
258         return raftConfig.getTempFileDirectory();
259     }
260
261     private void setTempFileDirectory(final String tempFileDirectory) {
262         raftConfig.setTempFileDirectory(tempFileDirectory);
263     }
264
265     @Override
266     public int getFileBackedStreamingThreshold() {
267         return raftConfig.getFileBackedStreamingThreshold();
268     }
269
270     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
271         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
272     }
273
274     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
275         raftConfig.setPeerAddressResolver(resolver);
276     }
277
278     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
279         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
280                 TimeUnit.MILLISECONDS));
281     }
282
283     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
284         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
285     }
286
287
288     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
289         raftConfig.setIsolatedLeaderCheckInterval(
290                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
291     }
292
293     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
294         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
295     }
296
297     private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
298         raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
299     }
300
301     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
302         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
303     }
304
305     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
306         checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
307         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
308     }
309
310     private void setSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
311         checkArgument(shardSnapshotDataThreshold >= 0);
312         raftConfig.setSnapshotDataThreshold(shardSnapshotDataThreshold);
313     }
314
315     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
316         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
317     }
318
319     /**
320      * Set the interval in seconds after which a snapshot should be taken during the recovery process.
321      * 0 means don't take snapshots
322      */
323     private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
324         raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
325     }
326
327     @Deprecated
328     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
329         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
330         // maximumMessageSliceSize.
331         if (shardSnapshotChunkSize < maximumMessageSliceSize) {
332             raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
333         }
334     }
335
336     private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
337         raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
338         this.maximumMessageSliceSize = maximumMessageSliceSize;
339     }
340
341     private void setSyncIndexThreshold(final long syncIndexThreshold) {
342         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
343     }
344
345     public int getShardBatchedModificationCount() {
346         return shardBatchedModificationCount;
347     }
348
349     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
350         return writeOnlyTransactionOptimizationsEnabled;
351     }
352
353     public long getShardCommitQueueExpiryTimeoutInMillis() {
354         return shardCommitQueueExpiryTimeoutInMillis;
355     }
356
357     public boolean isTransactionDebugContextEnabled() {
358         return transactionDebugContextEnabled;
359     }
360
361     public boolean isUseTellBasedProtocol() {
362         return useTellBasedProtocol;
363     }
364
365     public boolean isUseLz4Compression() {
366         return useLz4Compression;
367     }
368
369     @Override
370     public int getMaximumMessageSliceSize() {
371         return maximumMessageSliceSize;
372     }
373
374     @Override
375     public long getBackendAlivenessTimerInterval() {
376         return backendAlivenessTimerInterval;
377     }
378
379     @Override
380     public long getRequestTimeout() {
381         return requestTimeout;
382     }
383
384     @Override
385     public long getNoProgressTimeout() {
386         return noProgressTimeout;
387     }
388
389     public int getInitialPayloadSerializedBufferCapacity() {
390         return initialPayloadSerializedBufferCapacity;
391     }
392
393     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
394         private final DatastoreContext datastoreContext;
395
396         Builder(final DatastoreContext datastoreContext) {
397             this.datastoreContext = datastoreContext;
398         }
399
400         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
401             // TODO - this is defined in the yang DataStoreProperties but not currently used.
402             return this;
403         }
404
405         public Builder enableMetricCapture(final boolean enableMetricCapture) {
406             // TODO - this is defined in the yang DataStoreProperties but not currently used.
407             return this;
408         }
409
410
411         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
412             datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
413             return this;
414         }
415
416         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
417             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
418         }
419
420         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
421             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
422             return this;
423         }
424
425         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
426             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
427             return this;
428         }
429
430         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
431             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
432             return this;
433         }
434
435         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
436             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
437             return this;
438         }
439
440         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
441             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
442             return this;
443         }
444
445         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
446             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
447             return this;
448         }
449
450         public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
451             checkArgument(recoverySnapshotIntervalSeconds >= 0);
452             datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
453             return this;
454         }
455
456         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
457             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
458             return this;
459         }
460
461         public Builder shardSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
462             datastoreContext.setSnapshotDataThreshold(shardSnapshotDataThreshold);
463             return this;
464         }
465
466         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
467             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
468             return this;
469         }
470
471         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
472             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
473             return this;
474         }
475
476         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
477             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
478             return this;
479         }
480
481         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
482             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
483         }
484
485         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
486             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
487             return this;
488         }
489
490         public Builder initialSettleTimeoutMultiplier(final int multiplier) {
491             checkArgument(multiplier >= 0);
492             datastoreContext.initialSettleTimeoutMultiplier = multiplier;
493             return this;
494         }
495
496         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
497             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
498         }
499
500         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
501             datastoreContext.configurationReader = configurationReader;
502             return this;
503         }
504
505         public Builder persistent(final boolean persistent) {
506             datastoreContext.persistent = persistent;
507             return this;
508         }
509
510         public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) {
511             datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite;
512             return this;
513         }
514
515         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
516             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
517             return this;
518         }
519
520         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
521             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
522             return this;
523         }
524
525         public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
526             datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
527             return this;
528         }
529
530         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
531             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
532             return this;
533         }
534
535         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
536             datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
537
538             // Retain compatible naming
539             switch (logicalStoreType) {
540                 case CONFIGURATION:
541                     dataStoreName("config");
542                     break;
543                 case OPERATIONAL:
544                     dataStoreName("operational");
545                     break;
546                 default:
547                     dataStoreName(logicalStoreType.name());
548             }
549
550             return this;
551         }
552
553         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
554             datastoreContext.storeRoot = storeRoot;
555             return this;
556         }
557
558         public Builder dataStoreName(final String dataStoreName) {
559             datastoreContext.dataStoreName = requireNonNull(dataStoreName);
560             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
561             return this;
562         }
563
564         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
565             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
566             return this;
567         }
568
569         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
570             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
571             return this;
572         }
573
574         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
575             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
576             return this;
577         }
578
579         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
580             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
581                     value, TimeUnit.SECONDS);
582             return this;
583         }
584
585         public Builder transactionDebugContextEnabled(final boolean value) {
586             datastoreContext.transactionDebugContextEnabled = value;
587             return this;
588         }
589
590         @Deprecated(forRemoval = true)
591         public Builder maxShardDataChangeExecutorPoolSize(final int newMaxShardDataChangeExecutorPoolSize) {
592             return this;
593         }
594
595         @Deprecated(forRemoval = true)
596         public Builder maxShardDataChangeExecutorQueueSize(final int newMaxShardDataChangeExecutorQueueSize) {
597             return this;
598         }
599
600         @Deprecated(forRemoval = true)
601         public Builder maxShardDataChangeListenerQueueSize(final int newMaxShardDataChangeListenerQueueSize) {
602             return this;
603         }
604
605         @Deprecated(forRemoval = true)
606         public Builder maxShardDataStoreExecutorQueueSize(final int newMaxShardDataStoreExecutorQueueSize) {
607             return this;
608         }
609
610         public Builder useTellBasedProtocol(final boolean value) {
611             datastoreContext.useTellBasedProtocol = value;
612             return this;
613         }
614
615         public Builder useLz4Compression(final boolean value) {
616             datastoreContext.useLz4Compression = value;
617             return this;
618         }
619
620         /**
621          * For unit tests only.
622          */
623         @VisibleForTesting
624         public Builder shardManagerPersistenceId(final String id) {
625             datastoreContext.shardManagerPersistenceId = id;
626             return this;
627         }
628
629         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
630             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
631             return this;
632         }
633
634         @Deprecated
635         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
636             LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
637                     + "use maximum-message-slice-size instead");
638             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
639             return this;
640         }
641
642         public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
643             datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
644             return this;
645         }
646
647         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
648             datastoreContext.setPeerAddressResolver(resolver);
649             return this;
650         }
651
652         public Builder tempFileDirectory(final String tempFileDirectory) {
653             datastoreContext.setTempFileDirectory(tempFileDirectory);
654             return this;
655         }
656
657         public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
658             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
659             return this;
660         }
661
662         public Builder syncIndexThreshold(final long syncIndexThreshold) {
663             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
664             return this;
665         }
666
667         public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
668             datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
669             return this;
670         }
671
672         public Builder frontendRequestTimeoutInSeconds(final long timeout) {
673             datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
674             return this;
675         }
676
677         public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
678             datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
679             return this;
680         }
681
682         public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
683             datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
684             return this;
685         }
686
687         @Override
688         public DatastoreContext build() {
689             if (datastoreContext.dataStoreName != null) {
690                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
691             }
692
693             return datastoreContext;
694         }
695     }
696 }