Fix followerDistributedDataStore tear down
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.TimeUnit;
18 import org.apache.commons.text.WordUtils;
19 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
21 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
22 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
23 import org.opendaylight.controller.cluster.raft.ConfigParams;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.FiniteDuration;
32
33 /**
34  * Contains contextual data for a data store.
35  *
36  * @author Thomas Pantelis
37  */
38 // Non-final for mocking
39 public class DatastoreContext implements ClientActorConfig {
40     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
41
42     public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
43         TimeUnit.MINUTES);
44     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
45     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
46     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
47     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
48     public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
49     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
50     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
51             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
52     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
53     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
54     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
55     public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
56     public static final boolean DEFAULT_PERSISTENT = true;
57     public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
58     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
59     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
60     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD = 0;
61     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
62     public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
63     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
64     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
65     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
66     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
67             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
68     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 480 * 1024; // 480KiB
69     public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
70     public static final ExportOnRecovery DEFAULT_EXPORT_ON_RECOVERY = ExportOnRecovery.Off;
71     public static final String DEFAULT_RECOVERY_EXPORT_BASE_DIR = "persistence-export";
72
73     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
74
75     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
76
77     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
78
79     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
80
81     private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
82     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
83     private String dataStoreMXBeanType;
84     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
85     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
86     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
87     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
88     private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
89     private boolean persistent = DEFAULT_PERSISTENT;
90     private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE;
91     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
92     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
93     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
94     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
95     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty();
96     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
97     private boolean writeOnlyTransactionOptimizationsEnabled = true;
98     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
99     private boolean useTellBasedProtocol = true;
100     private boolean transactionDebugContextEnabled = false;
101     private String shardManagerPersistenceId;
102     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
103     private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
104     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
105     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
106     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
107     private boolean useLz4Compression = false;
108     private ExportOnRecovery exportOnRecovery = DEFAULT_EXPORT_ON_RECOVERY;
109     private String recoveryExportBaseDir = DEFAULT_RECOVERY_EXPORT_BASE_DIR;
110
111     public static Set<String> getGlobalDatastoreNames() {
112         return GLOBAL_DATASTORE_NAMES;
113     }
114
115     DatastoreContext() {
116         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
117         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
118         setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
119         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
120         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
121         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
122         setSnapshotDataThreshold(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD);
123         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
124         setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
125         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
126         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
127     }
128
129     private DatastoreContext(final DatastoreContext other) {
130         shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
131         operationTimeoutInMillis = other.operationTimeoutInMillis;
132         dataStoreMXBeanType = other.dataStoreMXBeanType;
133         shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
134         shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
135         shardInitializationTimeout = other.shardInitializationTimeout;
136         shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
137         initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
138         persistent = other.persistent;
139         snapshotOnRootOverwrite = other.snapshotOnRootOverwrite;
140         configurationReader = other.configurationReader;
141         transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
142         dataStoreName = other.dataStoreName;
143         logicalStoreType = other.logicalStoreType;
144         storeRoot = other.storeRoot;
145         shardBatchedModificationCount = other.shardBatchedModificationCount;
146         writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
147         shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
148         transactionDebugContextEnabled = other.transactionDebugContextEnabled;
149         shardManagerPersistenceId = other.shardManagerPersistenceId;
150         useTellBasedProtocol = other.useTellBasedProtocol;
151         backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
152         requestTimeout = other.requestTimeout;
153         noProgressTimeout = other.noProgressTimeout;
154         initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
155         useLz4Compression = other.useLz4Compression;
156         exportOnRecovery = other.exportOnRecovery;
157         recoveryExportBaseDir = other.recoveryExportBaseDir;
158
159         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
160         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
161         setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
162         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
163         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
164         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
165         setSnapshotDataThreshold(other.raftConfig.getSnapshotDataThreshold());
166         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
167         setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
168         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
169         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
170         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
171         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
172         setTempFileDirectory(other.getTempFileDirectory());
173         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
174         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
175     }
176
177     public static Builder newBuilder() {
178         return new Builder(new DatastoreContext());
179     }
180
181     public static Builder newBuilderFrom(final DatastoreContext context) {
182         return new Builder(new DatastoreContext(context));
183     }
184
185     public FiniteDuration getShardTransactionIdleTimeout() {
186         return shardTransactionIdleTimeout;
187     }
188
189     public String getDataStoreMXBeanType() {
190         return dataStoreMXBeanType;
191     }
192
193     public long getOperationTimeoutInMillis() {
194         return operationTimeoutInMillis;
195     }
196
197     public ConfigParams getShardRaftConfig() {
198         return raftConfig;
199     }
200
201     public int getShardTransactionCommitTimeoutInSeconds() {
202         return shardTransactionCommitTimeoutInSeconds;
203     }
204
205     public int getShardTransactionCommitQueueCapacity() {
206         return shardTransactionCommitQueueCapacity;
207     }
208
209     public Timeout getShardInitializationTimeout() {
210         return shardInitializationTimeout;
211     }
212
213     public Timeout getShardLeaderElectionTimeout() {
214         return shardLeaderElectionTimeout;
215     }
216
217     /**
218      * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
219      * on the local node to settle.
220      *
221      * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
222      */
223     public int getInitialSettleTimeoutMultiplier() {
224         return initialSettleTimeoutMultiplier;
225     }
226
227     public boolean isPersistent() {
228         return persistent;
229     }
230
231     public boolean isSnapshotOnRootOverwrite() {
232         return snapshotOnRootOverwrite;
233     }
234
235     public AkkaConfigurationReader getConfigurationReader() {
236         return configurationReader;
237     }
238
239     public long getShardElectionTimeoutFactor() {
240         return raftConfig.getElectionTimeoutFactor();
241     }
242
243     public String getDataStoreName() {
244         return dataStoreName;
245     }
246
247     public LogicalDatastoreType getLogicalStoreType() {
248         return logicalStoreType;
249     }
250
251     public YangInstanceIdentifier getStoreRoot() {
252         return storeRoot;
253     }
254
255     public long getTransactionCreationInitialRateLimit() {
256         return transactionCreationInitialRateLimit;
257     }
258
259     public String getShardManagerPersistenceId() {
260         return shardManagerPersistenceId;
261     }
262
263     @Override
264     public String getTempFileDirectory() {
265         return raftConfig.getTempFileDirectory();
266     }
267
268     private void setTempFileDirectory(final String tempFileDirectory) {
269         raftConfig.setTempFileDirectory(tempFileDirectory);
270     }
271
272     @Override
273     public int getFileBackedStreamingThreshold() {
274         return raftConfig.getFileBackedStreamingThreshold();
275     }
276
277     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
278         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
279     }
280
281     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
282         raftConfig.setPeerAddressResolver(resolver);
283     }
284
285     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
286         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
287                 TimeUnit.MILLISECONDS));
288     }
289
290     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
291         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
292     }
293
294
295     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
296         raftConfig.setIsolatedLeaderCheckInterval(
297                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
298     }
299
300     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
301         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
302     }
303
304     private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
305         raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
306     }
307
308     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
309         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
310     }
311
312     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
313         checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
314         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
315     }
316
317     private void setSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
318         checkArgument(shardSnapshotDataThreshold >= 0);
319         raftConfig.setSnapshotDataThreshold(shardSnapshotDataThreshold);
320     }
321
322     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
323         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
324     }
325
326     /**
327      * Set the interval in seconds after which a snapshot should be taken during the recovery process.
328      * 0 means don't take snapshots
329      */
330     private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
331         raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
332     }
333
334     @Deprecated
335     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
336         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
337         // maximumMessageSliceSize.
338         if (shardSnapshotChunkSize < maximumMessageSliceSize) {
339             raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
340         }
341     }
342
343     private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
344         raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
345         this.maximumMessageSliceSize = maximumMessageSliceSize;
346     }
347
348     private void setSyncIndexThreshold(final long syncIndexThreshold) {
349         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
350     }
351
352     public int getShardBatchedModificationCount() {
353         return shardBatchedModificationCount;
354     }
355
356     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
357         return writeOnlyTransactionOptimizationsEnabled;
358     }
359
360     public long getShardCommitQueueExpiryTimeoutInMillis() {
361         return shardCommitQueueExpiryTimeoutInMillis;
362     }
363
364     public boolean isTransactionDebugContextEnabled() {
365         return transactionDebugContextEnabled;
366     }
367
368     public boolean isUseTellBasedProtocol() {
369         return useTellBasedProtocol;
370     }
371
372     public boolean isUseLz4Compression() {
373         return useLz4Compression;
374     }
375
376     public ExportOnRecovery getExportOnRecovery() {
377         return exportOnRecovery;
378     }
379
380     public String getRecoveryExportBaseDir() {
381         return recoveryExportBaseDir;
382     }
383
384     @Override
385     public int getMaximumMessageSliceSize() {
386         return maximumMessageSliceSize;
387     }
388
389     @Override
390     public long getBackendAlivenessTimerInterval() {
391         return backendAlivenessTimerInterval;
392     }
393
394     @Override
395     public long getRequestTimeout() {
396         return requestTimeout;
397     }
398
399     @Override
400     public long getNoProgressTimeout() {
401         return noProgressTimeout;
402     }
403
404     public int getInitialPayloadSerializedBufferCapacity() {
405         return initialPayloadSerializedBufferCapacity;
406     }
407
408     public static class Builder {
409         private final DatastoreContext datastoreContext;
410
411         Builder(final DatastoreContext datastoreContext) {
412             this.datastoreContext = datastoreContext;
413         }
414
415         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
416             // TODO - this is defined in the yang DataStoreProperties but not currently used.
417             return this;
418         }
419
420         public Builder enableMetricCapture(final boolean enableMetricCapture) {
421             // TODO - this is defined in the yang DataStoreProperties but not currently used.
422             return this;
423         }
424
425
426         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
427             datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
428             return this;
429         }
430
431         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
432             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
433         }
434
435         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
436             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
437             return this;
438         }
439
440         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
441             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
442             return this;
443         }
444
445         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
446             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
447             return this;
448         }
449
450         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
451             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
452             return this;
453         }
454
455         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
456             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
457             return this;
458         }
459
460         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
461             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
462             return this;
463         }
464
465         public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
466             checkArgument(recoverySnapshotIntervalSeconds >= 0);
467             datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
468             return this;
469         }
470
471         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
472             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
473             return this;
474         }
475
476         public Builder shardSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
477             datastoreContext.setSnapshotDataThreshold(shardSnapshotDataThreshold);
478             return this;
479         }
480
481         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
482             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
483             return this;
484         }
485
486         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
487             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
488             return this;
489         }
490
491         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
492             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
493             return this;
494         }
495
496         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
497             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
498         }
499
500         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
501             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
502             return this;
503         }
504
505         public Builder initialSettleTimeoutMultiplier(final int multiplier) {
506             checkArgument(multiplier >= 0);
507             datastoreContext.initialSettleTimeoutMultiplier = multiplier;
508             return this;
509         }
510
511         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
512             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
513         }
514
515         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
516             datastoreContext.configurationReader = configurationReader;
517             return this;
518         }
519
520         public Builder persistent(final boolean persistent) {
521             datastoreContext.persistent = persistent;
522             return this;
523         }
524
525         public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) {
526             datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite;
527             return this;
528         }
529
530         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
531             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
532             return this;
533         }
534
535         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
536             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
537             return this;
538         }
539
540         public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
541             datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
542             return this;
543         }
544
545         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
546             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
547             return this;
548         }
549
550         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
551             datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
552
553             // Retain compatible naming
554             switch (logicalStoreType) {
555                 case CONFIGURATION:
556                     dataStoreName("config");
557                     break;
558                 case OPERATIONAL:
559                     dataStoreName("operational");
560                     break;
561                 default:
562                     dataStoreName(logicalStoreType.name());
563             }
564
565             return this;
566         }
567
568         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
569             datastoreContext.storeRoot = storeRoot;
570             return this;
571         }
572
573         public Builder dataStoreName(final String dataStoreName) {
574             datastoreContext.dataStoreName = requireNonNull(dataStoreName);
575             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
576             return this;
577         }
578
579         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
580             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
581             return this;
582         }
583
584         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
585             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
586             return this;
587         }
588
589         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
590             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
591             return this;
592         }
593
594         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
595             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
596                     value, TimeUnit.SECONDS);
597             return this;
598         }
599
600         public Builder transactionDebugContextEnabled(final boolean value) {
601             datastoreContext.transactionDebugContextEnabled = value;
602             return this;
603         }
604
605         public Builder useTellBasedProtocol(final boolean value) {
606             datastoreContext.useTellBasedProtocol = value;
607             return this;
608         }
609
610         public Builder useLz4Compression(final boolean value) {
611             datastoreContext.useLz4Compression = value;
612             return this;
613         }
614
615         public Builder exportOnRecovery(final ExportOnRecovery value) {
616             datastoreContext.exportOnRecovery = value;
617             return this;
618         }
619
620         public Builder recoveryExportBaseDir(final String value) {
621             datastoreContext.recoveryExportBaseDir = value;
622             return this;
623         }
624
625         /**
626          * For unit tests only.
627          */
628         @VisibleForTesting
629         public Builder shardManagerPersistenceId(final String id) {
630             datastoreContext.shardManagerPersistenceId = id;
631             return this;
632         }
633
634         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
635             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
636             return this;
637         }
638
639         @Deprecated
640         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
641             LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
642                     + "use maximum-message-slice-size instead");
643             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
644             return this;
645         }
646
647         public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
648             datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
649             return this;
650         }
651
652         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
653             datastoreContext.setPeerAddressResolver(resolver);
654             return this;
655         }
656
657         public Builder tempFileDirectory(final String tempFileDirectory) {
658             datastoreContext.setTempFileDirectory(tempFileDirectory);
659             return this;
660         }
661
662         public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
663             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
664             return this;
665         }
666
667         public Builder syncIndexThreshold(final long syncIndexThreshold) {
668             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
669             return this;
670         }
671
672         public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
673             datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
674             return this;
675         }
676
677         public Builder frontendRequestTimeoutInSeconds(final long timeout) {
678             datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
679             return this;
680         }
681
682         public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
683             datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
684             return this;
685         }
686
687         public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
688             datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
689             return this;
690         }
691
692         public DatastoreContext build() {
693             if (datastoreContext.dataStoreName != null) {
694                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
695             }
696
697             return datastoreContext;
698         }
699     }
700 }