Migrate DatastoreContextIntrospectorFactory to OSGi DS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.TimeUnit;
18 import org.apache.commons.text.WordUtils;
19 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
21 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
22 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
23 import org.opendaylight.controller.cluster.raft.ConfigParams;
24 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
25 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.duration.FiniteDuration;
31
32 /**
33  * Contains contextual data for a data store.
34  *
35  * @author Thomas Pantelis
36  */
37 // Non-final for mocking
38 public class DatastoreContext implements ClientActorConfig {
39     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
40
41     public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
42         TimeUnit.MINUTES);
43     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
44     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
45     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
46     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
47     public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
48     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
49     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
50             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
51     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
52     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
53     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
54     public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
55     public static final boolean DEFAULT_PERSISTENT = true;
56     public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
57     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
58     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
59     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
60     public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
61     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
62     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
63     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
64     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
65             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
66     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
67     public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
68
69     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
70
71     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
72
73     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
74
75     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
76
77     private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
78     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
79     private String dataStoreMXBeanType;
80     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
81     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
82     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
83     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
84     private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
85     private boolean persistent = DEFAULT_PERSISTENT;
86     private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE;
87     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
88     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
89     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
90     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
91     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty();
92     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
93     private boolean writeOnlyTransactionOptimizationsEnabled = true;
94     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
95     private boolean useTellBasedProtocol = false;
96     private boolean transactionDebugContextEnabled = false;
97     private String shardManagerPersistenceId;
98     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
99     private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
100     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
101     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
102     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
103     private boolean useLz4Compression = false;
104
105     public static Set<String> getGlobalDatastoreNames() {
106         return GLOBAL_DATASTORE_NAMES;
107     }
108
109     DatastoreContext() {
110         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
111         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
112         setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
113         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
114         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
115         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
116         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
117         setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
118         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
119         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
120     }
121
122     private DatastoreContext(final DatastoreContext other) {
123         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
124         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
125         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
126         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
127         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
128         this.shardInitializationTimeout = other.shardInitializationTimeout;
129         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
130         this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
131         this.persistent = other.persistent;
132         this.snapshotOnRootOverwrite = other.snapshotOnRootOverwrite;
133         this.configurationReader = other.configurationReader;
134         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
135         this.dataStoreName = other.dataStoreName;
136         this.logicalStoreType = other.logicalStoreType;
137         this.storeRoot = other.storeRoot;
138         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
139         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
140         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
141         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
142         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
143         this.useTellBasedProtocol = other.useTellBasedProtocol;
144         this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
145         this.requestTimeout = other.requestTimeout;
146         this.noProgressTimeout = other.noProgressTimeout;
147         this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
148         this.useLz4Compression = other.useLz4Compression;
149
150         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
151         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
152         setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
153         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
154         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
155         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
156         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
157         setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
158         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
159         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
160         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
161         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
162         setTempFileDirectory(other.getTempFileDirectory());
163         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
164         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
165     }
166
167     public static Builder newBuilder() {
168         return new Builder(new DatastoreContext());
169     }
170
171     public static Builder newBuilderFrom(final DatastoreContext context) {
172         return new Builder(new DatastoreContext(context));
173     }
174
175     public FiniteDuration getShardTransactionIdleTimeout() {
176         return shardTransactionIdleTimeout;
177     }
178
179     public String getDataStoreMXBeanType() {
180         return dataStoreMXBeanType;
181     }
182
183     public long getOperationTimeoutInMillis() {
184         return operationTimeoutInMillis;
185     }
186
187     public ConfigParams getShardRaftConfig() {
188         return raftConfig;
189     }
190
191     public int getShardTransactionCommitTimeoutInSeconds() {
192         return shardTransactionCommitTimeoutInSeconds;
193     }
194
195     public int getShardTransactionCommitQueueCapacity() {
196         return shardTransactionCommitQueueCapacity;
197     }
198
199     public Timeout getShardInitializationTimeout() {
200         return shardInitializationTimeout;
201     }
202
203     public Timeout getShardLeaderElectionTimeout() {
204         return shardLeaderElectionTimeout;
205     }
206
207     /**
208      * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
209      * on the local node to settle.
210      *
211      * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
212      */
213     public int getInitialSettleTimeoutMultiplier() {
214         return initialSettleTimeoutMultiplier;
215     }
216
217     public boolean isPersistent() {
218         return persistent;
219     }
220
221     public boolean isSnapshotOnRootOverwrite() {
222         return this.snapshotOnRootOverwrite;
223     }
224
225     public AkkaConfigurationReader getConfigurationReader() {
226         return configurationReader;
227     }
228
229     public long getShardElectionTimeoutFactor() {
230         return raftConfig.getElectionTimeoutFactor();
231     }
232
233     public String getDataStoreName() {
234         return dataStoreName;
235     }
236
237     public LogicalDatastoreType getLogicalStoreType() {
238         return logicalStoreType;
239     }
240
241     public YangInstanceIdentifier getStoreRoot() {
242         return storeRoot;
243     }
244
245     public long getTransactionCreationInitialRateLimit() {
246         return transactionCreationInitialRateLimit;
247     }
248
249     public String getShardManagerPersistenceId() {
250         return shardManagerPersistenceId;
251     }
252
253     @Override
254     public String getTempFileDirectory() {
255         return raftConfig.getTempFileDirectory();
256     }
257
258     private void setTempFileDirectory(final String tempFileDirectory) {
259         raftConfig.setTempFileDirectory(tempFileDirectory);
260     }
261
262     @Override
263     public int getFileBackedStreamingThreshold() {
264         return raftConfig.getFileBackedStreamingThreshold();
265     }
266
267     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
268         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
269     }
270
271     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
272         raftConfig.setPeerAddressResolver(resolver);
273     }
274
275     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
276         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
277                 TimeUnit.MILLISECONDS));
278     }
279
280     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
281         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
282     }
283
284
285     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
286         raftConfig.setIsolatedLeaderCheckInterval(
287                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
288     }
289
290     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
291         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
292     }
293
294     private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
295         raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
296     }
297
298     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
299         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
300     }
301
302     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
303         checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
304         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
305     }
306
307     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
308         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
309     }
310
311     /**
312      * Set the interval in seconds after which a snapshot should be taken during the recovery process.
313      * 0 means don't take snapshots
314      */
315     private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
316         raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
317     }
318
319     @Deprecated
320     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
321         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
322         // maximumMessageSliceSize.
323         if (shardSnapshotChunkSize < maximumMessageSliceSize) {
324             raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
325         }
326     }
327
328     private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
329         raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
330         this.maximumMessageSliceSize = maximumMessageSliceSize;
331     }
332
333     private void setSyncIndexThreshold(final long syncIndexThreshold) {
334         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
335     }
336
337     public int getShardBatchedModificationCount() {
338         return shardBatchedModificationCount;
339     }
340
341     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
342         return writeOnlyTransactionOptimizationsEnabled;
343     }
344
345     public long getShardCommitQueueExpiryTimeoutInMillis() {
346         return shardCommitQueueExpiryTimeoutInMillis;
347     }
348
349     public boolean isTransactionDebugContextEnabled() {
350         return transactionDebugContextEnabled;
351     }
352
353     public boolean isUseTellBasedProtocol() {
354         return useTellBasedProtocol;
355     }
356
357     public boolean isUseLz4Compression() {
358         return useLz4Compression;
359     }
360
361     @Override
362     public int getMaximumMessageSliceSize() {
363         return maximumMessageSliceSize;
364     }
365
366     @Override
367     public long getBackendAlivenessTimerInterval() {
368         return backendAlivenessTimerInterval;
369     }
370
371     @Override
372     public long getRequestTimeout() {
373         return requestTimeout;
374     }
375
376     @Override
377     public long getNoProgressTimeout() {
378         return noProgressTimeout;
379     }
380
381     public int getInitialPayloadSerializedBufferCapacity() {
382         return initialPayloadSerializedBufferCapacity;
383     }
384
385     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
386         private final DatastoreContext datastoreContext;
387
388         Builder(final DatastoreContext datastoreContext) {
389             this.datastoreContext = datastoreContext;
390         }
391
392         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
393             // TODO - this is defined in the yang DataStoreProperties but not currently used.
394             return this;
395         }
396
397         public Builder enableMetricCapture(final boolean enableMetricCapture) {
398             // TODO - this is defined in the yang DataStoreProperties but not currently used.
399             return this;
400         }
401
402
403         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
404             datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
405             return this;
406         }
407
408         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
409             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
410         }
411
412         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
413             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
414             return this;
415         }
416
417         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
418             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
419             return this;
420         }
421
422         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
423             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
424             return this;
425         }
426
427         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
428             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
429             return this;
430         }
431
432         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
433             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
434             return this;
435         }
436
437         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
438             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
439             return this;
440         }
441
442         public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
443             checkArgument(recoverySnapshotIntervalSeconds >= 0);
444             datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
445             return this;
446         }
447
448         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
449             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
450             return this;
451         }
452
453         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
454             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
455             return this;
456         }
457
458         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
459             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
460             return this;
461         }
462
463         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
464             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
465             return this;
466         }
467
468         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
469             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
470         }
471
472         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
473             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
474             return this;
475         }
476
477         public Builder initialSettleTimeoutMultiplier(final int multiplier) {
478             checkArgument(multiplier >= 0);
479             datastoreContext.initialSettleTimeoutMultiplier = multiplier;
480             return this;
481         }
482
483         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
484             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
485         }
486
487         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
488             datastoreContext.configurationReader = configurationReader;
489             return this;
490         }
491
492         public Builder persistent(final boolean persistent) {
493             datastoreContext.persistent = persistent;
494             return this;
495         }
496
497         public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) {
498             datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite;
499             return this;
500         }
501
502         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
503             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
504             return this;
505         }
506
507         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
508             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
509             return this;
510         }
511
512         public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
513             datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
514             return this;
515         }
516
517         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
518             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
519             return this;
520         }
521
522         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
523             datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
524
525             // Retain compatible naming
526             switch (logicalStoreType) {
527                 case CONFIGURATION:
528                     dataStoreName("config");
529                     break;
530                 case OPERATIONAL:
531                     dataStoreName("operational");
532                     break;
533                 default:
534                     dataStoreName(logicalStoreType.name());
535             }
536
537             return this;
538         }
539
540         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
541             datastoreContext.storeRoot = storeRoot;
542             return this;
543         }
544
545         public Builder dataStoreName(final String dataStoreName) {
546             datastoreContext.dataStoreName = requireNonNull(dataStoreName);
547             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
548             return this;
549         }
550
551         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
552             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
553             return this;
554         }
555
556         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
557             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
558             return this;
559         }
560
561         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
562             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
563             return this;
564         }
565
566         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
567             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
568                     value, TimeUnit.SECONDS);
569             return this;
570         }
571
572         public Builder transactionDebugContextEnabled(final boolean value) {
573             datastoreContext.transactionDebugContextEnabled = value;
574             return this;
575         }
576
577         @Deprecated(forRemoval = true)
578         public Builder maxShardDataChangeExecutorPoolSize(final int newMaxShardDataChangeExecutorPoolSize) {
579             return this;
580         }
581
582         @Deprecated(forRemoval = true)
583         public Builder maxShardDataChangeExecutorQueueSize(final int newMaxShardDataChangeExecutorQueueSize) {
584             return this;
585         }
586
587         @Deprecated(forRemoval = true)
588         public Builder maxShardDataChangeListenerQueueSize(final int newMaxShardDataChangeListenerQueueSize) {
589             return this;
590         }
591
592         @Deprecated(forRemoval = true)
593         public Builder maxShardDataStoreExecutorQueueSize(final int newMaxShardDataStoreExecutorQueueSize) {
594             return this;
595         }
596
597         public Builder useTellBasedProtocol(final boolean value) {
598             datastoreContext.useTellBasedProtocol = value;
599             return this;
600         }
601
602         public Builder useLz4Compression(final boolean value) {
603             datastoreContext.useLz4Compression = value;
604             return this;
605         }
606
607         /**
608          * For unit tests only.
609          */
610         @VisibleForTesting
611         public Builder shardManagerPersistenceId(final String id) {
612             datastoreContext.shardManagerPersistenceId = id;
613             return this;
614         }
615
616         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
617             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
618             return this;
619         }
620
621         @Deprecated
622         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
623             LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
624                     + "use maximum-message-slice-size instead");
625             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
626             return this;
627         }
628
629         public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
630             datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
631             return this;
632         }
633
634         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
635             datastoreContext.setPeerAddressResolver(resolver);
636             return this;
637         }
638
639         public Builder tempFileDirectory(final String tempFileDirectory) {
640             datastoreContext.setTempFileDirectory(tempFileDirectory);
641             return this;
642         }
643
644         public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
645             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
646             return this;
647         }
648
649         public Builder syncIndexThreshold(final long syncIndexThreshold) {
650             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
651             return this;
652         }
653
654         public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
655             datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
656             return this;
657         }
658
659         public Builder frontendRequestTimeoutInSeconds(final long timeout) {
660             datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
661             return this;
662         }
663
664         public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
665             datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
666             return this;
667         }
668
669         public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
670             datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
671             return this;
672         }
673
674         @Override
675         public DatastoreContext build() {
676             if (datastoreContext.dataStoreName != null) {
677                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
678             }
679
680             return datastoreContext;
681         }
682     }
683 }