Allow shard settle timeout to be tuned
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.TimeUnit;
18 import org.apache.commons.text.WordUtils;
19 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
21 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
22 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
23 import org.opendaylight.controller.cluster.raft.ConfigParams;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStoreConfigProperties;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.FiniteDuration;
32
33 /**
34  * Contains contextual data for a data store.
35  *
36  * @author Thomas Pantelis
37  */
38 // Non-final for mocking
39 public class DatastoreContext implements ClientActorConfig {
40     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
41
42     public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
43         TimeUnit.MINUTES);
44     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
45     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
46     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
47     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
48     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
49     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
50             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
51     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
52     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
53     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
54     public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
55     public static final boolean DEFAULT_PERSISTENT = true;
56     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
57     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
58     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
59     public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
60     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
61     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
62     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
63     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
64             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
65     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
66     public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
67
68     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
69
70     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
71
72     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
73
74     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
75
76     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
77     private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
78     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
79     private String dataStoreMXBeanType;
80     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
81     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
82     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
83     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
84     private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
85     private boolean persistent = DEFAULT_PERSISTENT;
86     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
87     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
88     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
89     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
90     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty();
91     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
92     private boolean writeOnlyTransactionOptimizationsEnabled = true;
93     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
94     private boolean useTellBasedProtocol = false;
95     private boolean transactionDebugContextEnabled = false;
96     private String shardManagerPersistenceId;
97     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
98     private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
99     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
100     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
101     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
102
103     public static Set<String> getGlobalDatastoreNames() {
104         return GLOBAL_DATASTORE_NAMES;
105     }
106
107     DatastoreContext() {
108         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
109         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
110         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
111         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
112         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
113         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
114         setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
115         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
116         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
117     }
118
119     private DatastoreContext(final DatastoreContext other) {
120         this.dataStoreProperties = other.dataStoreProperties;
121         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
122         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
123         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
124         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
125         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
126         this.shardInitializationTimeout = other.shardInitializationTimeout;
127         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
128         this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
129         this.persistent = other.persistent;
130         this.configurationReader = other.configurationReader;
131         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
132         this.dataStoreName = other.dataStoreName;
133         this.logicalStoreType = other.logicalStoreType;
134         this.storeRoot = other.storeRoot;
135         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
136         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
137         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
138         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
139         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
140         this.useTellBasedProtocol = other.useTellBasedProtocol;
141         this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
142         this.requestTimeout = other.requestTimeout;
143         this.noProgressTimeout = other.noProgressTimeout;
144         this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
145
146         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
147         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
148         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
149         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
150         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
151         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
152         setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
153         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
154         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
155         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
156         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
157         setTempFileDirectory(other.getTempFileDirectory());
158         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
159         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
160     }
161
162     public static Builder newBuilder() {
163         return new Builder(new DatastoreContext());
164     }
165
166     public static Builder newBuilderFrom(final DatastoreContext context) {
167         return new Builder(new DatastoreContext(context));
168     }
169
170     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
171         return dataStoreProperties;
172     }
173
174     public FiniteDuration getShardTransactionIdleTimeout() {
175         return shardTransactionIdleTimeout;
176     }
177
178     public String getDataStoreMXBeanType() {
179         return dataStoreMXBeanType;
180     }
181
182     public long getOperationTimeoutInMillis() {
183         return operationTimeoutInMillis;
184     }
185
186     public ConfigParams getShardRaftConfig() {
187         return raftConfig;
188     }
189
190     public int getShardTransactionCommitTimeoutInSeconds() {
191         return shardTransactionCommitTimeoutInSeconds;
192     }
193
194     public int getShardTransactionCommitQueueCapacity() {
195         return shardTransactionCommitQueueCapacity;
196     }
197
198     public Timeout getShardInitializationTimeout() {
199         return shardInitializationTimeout;
200     }
201
202     public Timeout getShardLeaderElectionTimeout() {
203         return shardLeaderElectionTimeout;
204     }
205
206     /**
207      * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
208      * on the local node to settle.
209      *
210      * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
211      */
212     public int getInitialSettleTimeoutMultiplier() {
213         return initialSettleTimeoutMultiplier;
214     }
215
216     public boolean isPersistent() {
217         return persistent;
218     }
219
220     public AkkaConfigurationReader getConfigurationReader() {
221         return configurationReader;
222     }
223
224     public long getShardElectionTimeoutFactor() {
225         return raftConfig.getElectionTimeoutFactor();
226     }
227
228     public String getDataStoreName() {
229         return dataStoreName;
230     }
231
232     public LogicalDatastoreType getLogicalStoreType() {
233         return logicalStoreType;
234     }
235
236     public YangInstanceIdentifier getStoreRoot() {
237         return storeRoot;
238     }
239
240     public long getTransactionCreationInitialRateLimit() {
241         return transactionCreationInitialRateLimit;
242     }
243
244     public String getShardManagerPersistenceId() {
245         return shardManagerPersistenceId;
246     }
247
248     @Override
249     public String getTempFileDirectory() {
250         return raftConfig.getTempFileDirectory();
251     }
252
253     private void setTempFileDirectory(final String tempFileDirectory) {
254         raftConfig.setTempFileDirectory(tempFileDirectory);
255     }
256
257     @Override
258     public int getFileBackedStreamingThreshold() {
259         return raftConfig.getFileBackedStreamingThreshold();
260     }
261
262     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
263         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
264     }
265
266     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
267         raftConfig.setPeerAddressResolver(resolver);
268     }
269
270     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
271         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
272                 TimeUnit.MILLISECONDS));
273     }
274
275     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
276         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
277     }
278
279
280     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
281         raftConfig.setIsolatedLeaderCheckInterval(
282                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
283     }
284
285     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
286         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
287     }
288
289     private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
290         raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
291     }
292
293     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
294         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
295     }
296
297     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
298         checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
299         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
300     }
301
302     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
303         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
304     }
305
306     @Deprecated
307     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
308         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
309         // maximumMessageSliceSize.
310         if (shardSnapshotChunkSize < maximumMessageSliceSize) {
311             raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
312         }
313     }
314
315     private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
316         raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
317         this.maximumMessageSliceSize = maximumMessageSliceSize;
318     }
319
320     private void setSyncIndexThreshold(final long syncIndexThreshold) {
321         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
322     }
323
324     public int getShardBatchedModificationCount() {
325         return shardBatchedModificationCount;
326     }
327
328     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
329         return writeOnlyTransactionOptimizationsEnabled;
330     }
331
332     public long getShardCommitQueueExpiryTimeoutInMillis() {
333         return shardCommitQueueExpiryTimeoutInMillis;
334     }
335
336     public boolean isTransactionDebugContextEnabled() {
337         return transactionDebugContextEnabled;
338     }
339
340     public boolean isUseTellBasedProtocol() {
341         return useTellBasedProtocol;
342     }
343
344     @Override
345     public int getMaximumMessageSliceSize() {
346         return maximumMessageSliceSize;
347     }
348
349     @Override
350     public long getBackendAlivenessTimerInterval() {
351         return backendAlivenessTimerInterval;
352     }
353
354     @Override
355     public long getRequestTimeout() {
356         return requestTimeout;
357     }
358
359     @Override
360     public long getNoProgressTimeout() {
361         return noProgressTimeout;
362     }
363
364     public int getInitialPayloadSerializedBufferCapacity() {
365         return initialPayloadSerializedBufferCapacity;
366     }
367
368     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
369         private final DatastoreContext datastoreContext;
370         private int maxShardDataChangeExecutorPoolSize =
371                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
372         private int maxShardDataChangeExecutorQueueSize =
373                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
374         private int maxShardDataChangeListenerQueueSize =
375                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
376         private int maxShardDataStoreExecutorQueueSize =
377                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
378
379         Builder(final DatastoreContext datastoreContext) {
380             this.datastoreContext = datastoreContext;
381
382             if (datastoreContext.getDataStoreProperties() != null) {
383                 maxShardDataChangeExecutorPoolSize =
384                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
385                 maxShardDataChangeExecutorQueueSize =
386                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
387                 maxShardDataChangeListenerQueueSize =
388                         datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
389                 maxShardDataStoreExecutorQueueSize =
390                         datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
391             }
392         }
393
394         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
395             // TODO - this is defined in the yang DataStoreProperties but not currently used.
396             return this;
397         }
398
399         public Builder enableMetricCapture(final boolean enableMetricCapture) {
400             // TODO - this is defined in the yang DataStoreProperties but not currently used.
401             return this;
402         }
403
404
405         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
406             datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
407             return this;
408         }
409
410         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
411             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
412         }
413
414         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
415             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
416             return this;
417         }
418
419         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
420             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
421             return this;
422         }
423
424         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
425             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
426             return this;
427         }
428
429         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
430             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
431             return this;
432         }
433
434         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
435             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
436             return this;
437         }
438
439         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
440             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
441             return this;
442         }
443
444         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
445             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
446             return this;
447         }
448
449         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
450             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
451             return this;
452         }
453
454         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
455             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
456             return this;
457         }
458
459         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
460             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
461             return this;
462         }
463
464         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
465             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
466         }
467
468         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
469             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
470             return this;
471         }
472
473         public Builder initialSettleTimeoutMultiplier(final int multiplier) {
474             checkArgument(multiplier >= 0);
475             datastoreContext.initialSettleTimeoutMultiplier = multiplier;
476             return this;
477         }
478
479         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
480             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
481         }
482
483         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
484             datastoreContext.configurationReader = configurationReader;
485             return this;
486         }
487
488         public Builder persistent(final boolean persistent) {
489             datastoreContext.persistent = persistent;
490             return this;
491         }
492
493         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
494             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
495             return this;
496         }
497
498         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
499             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
500             return this;
501         }
502
503         public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
504             datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
505             return this;
506         }
507
508         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
509             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
510             return this;
511         }
512
513         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
514             datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
515
516             // Retain compatible naming
517             switch (logicalStoreType) {
518                 case CONFIGURATION:
519                     dataStoreName("config");
520                     break;
521                 case OPERATIONAL:
522                     dataStoreName("operational");
523                     break;
524                 default:
525                     dataStoreName(logicalStoreType.name());
526             }
527
528             return this;
529         }
530
531         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
532             datastoreContext.storeRoot = storeRoot;
533             return this;
534         }
535
536         public Builder dataStoreName(final String dataStoreName) {
537             datastoreContext.dataStoreName = requireNonNull(dataStoreName);
538             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
539             return this;
540         }
541
542         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
543             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
544             return this;
545         }
546
547         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
548             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
549             return this;
550         }
551
552         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
553             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
554             return this;
555         }
556
557         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
558             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
559                     value, TimeUnit.SECONDS);
560             return this;
561         }
562
563         public Builder transactionDebugContextEnabled(final boolean value) {
564             datastoreContext.transactionDebugContextEnabled = value;
565             return this;
566         }
567
568         public Builder maxShardDataChangeExecutorPoolSize(final int newMaxShardDataChangeExecutorPoolSize) {
569             this.maxShardDataChangeExecutorPoolSize = newMaxShardDataChangeExecutorPoolSize;
570             return this;
571         }
572
573         public Builder maxShardDataChangeExecutorQueueSize(final int newMaxShardDataChangeExecutorQueueSize) {
574             this.maxShardDataChangeExecutorQueueSize = newMaxShardDataChangeExecutorQueueSize;
575             return this;
576         }
577
578         public Builder maxShardDataChangeListenerQueueSize(final int newMaxShardDataChangeListenerQueueSize) {
579             this.maxShardDataChangeListenerQueueSize = newMaxShardDataChangeListenerQueueSize;
580             return this;
581         }
582
583         public Builder maxShardDataStoreExecutorQueueSize(final int newMaxShardDataStoreExecutorQueueSize) {
584             this.maxShardDataStoreExecutorQueueSize = newMaxShardDataStoreExecutorQueueSize;
585             return this;
586         }
587
588         public Builder useTellBasedProtocol(final boolean value) {
589             datastoreContext.useTellBasedProtocol = value;
590             return this;
591         }
592
593         /**
594          * For unit tests only.
595          */
596         @VisibleForTesting
597         public Builder shardManagerPersistenceId(final String id) {
598             datastoreContext.shardManagerPersistenceId = id;
599             return this;
600         }
601
602         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
603             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
604             return this;
605         }
606
607         @Deprecated
608         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
609             LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
610                     + "use maximum-message-slice-size instead");
611             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
612             return this;
613         }
614
615         public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
616             datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
617             return this;
618         }
619
620         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
621             datastoreContext.setPeerAddressResolver(resolver);
622             return this;
623         }
624
625         public Builder tempFileDirectory(final String tempFileDirectory) {
626             datastoreContext.setTempFileDirectory(tempFileDirectory);
627             return this;
628         }
629
630         public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
631             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
632             return this;
633         }
634
635         public Builder syncIndexThreshold(final long syncIndexThreshold) {
636             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
637             return this;
638         }
639
640         public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
641             datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
642             return this;
643         }
644
645         public Builder frontendRequestTimeoutInSeconds(final long timeout) {
646             datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
647             return this;
648         }
649
650         public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
651             datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
652             return this;
653         }
654
655         public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
656             datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
657             return this;
658         }
659
660         @Override
661         public DatastoreContext build() {
662             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.builder()
663                     .maxDataChangeExecutorPoolSize(maxShardDataChangeExecutorPoolSize)
664                     .maxDataChangeExecutorQueueSize(maxShardDataChangeExecutorQueueSize)
665                     .maxDataChangeListenerQueueSize(maxShardDataChangeListenerQueueSize)
666                     .maxDataStoreExecutorQueueSize(maxShardDataStoreExecutorQueueSize)
667                     .build();
668
669             if (datastoreContext.dataStoreName != null) {
670                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
671             }
672
673             return datastoreContext;
674         }
675     }
676 }