BUG-8618: make sync threshold tuneable
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.util.Timeout;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import java.util.Set;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.TimeUnit;
17 import org.apache.commons.lang3.text.WordUtils;
18 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
19 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
20 import org.opendaylight.controller.cluster.raft.ConfigParams;
21 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
22 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import scala.concurrent.duration.Duration;
27 import scala.concurrent.duration.FiniteDuration;
28
29 /**
30  * Contains contextual data for a data store.
31  *
32  * @author Thomas Pantelis
33  */
34 public class DatastoreContext {
35     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
36
37     public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
38     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
39     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
40     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
41     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
42     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
43     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
44             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
45     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
46     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
47     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
48     public static final boolean DEFAULT_PERSISTENT = true;
49     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
50     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
51     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
52     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
53     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
54     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
55     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
56             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
57     public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
58
59     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
60
61     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
62
63     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
64
65     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
66     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
67     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
68     private String dataStoreMXBeanType;
69     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
70     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
71     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
72     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
73     private boolean persistent = DEFAULT_PERSISTENT;
74     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
75     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
76     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
77     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
78     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
79     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
80     private boolean writeOnlyTransactionOptimizationsEnabled = true;
81     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
82     private boolean useTellBasedProtocol = false;
83     private boolean transactionDebugContextEnabled = false;
84     private String shardManagerPersistenceId;
85
86     public static Set<String> getGlobalDatastoreNames() {
87         return GLOBAL_DATASTORE_NAMES;
88     }
89
90     private DatastoreContext() {
91         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
92         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
93         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
94         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
95         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
96         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
97         setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
98         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
99     }
100
101     private DatastoreContext(final DatastoreContext other) {
102         this.dataStoreProperties = other.dataStoreProperties;
103         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
104         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
105         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
106         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
107         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
108         this.shardInitializationTimeout = other.shardInitializationTimeout;
109         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
110         this.persistent = other.persistent;
111         this.configurationReader = other.configurationReader;
112         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
113         this.dataStoreName = other.dataStoreName;
114         this.logicalStoreType = other.logicalStoreType;
115         this.storeRoot = other.storeRoot;
116         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
117         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
118         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
119         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
120         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
121         this.useTellBasedProtocol = other.useTellBasedProtocol;
122
123         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
124         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
125         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
126         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
127         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
128         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
129         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
130         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
131         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
132         setTempFileDirectory(other.getTempFileDirectory());
133         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
134         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
135     }
136
137     public static Builder newBuilder() {
138         return new Builder(new DatastoreContext());
139     }
140
141     public static Builder newBuilderFrom(final DatastoreContext context) {
142         return new Builder(new DatastoreContext(context));
143     }
144
145     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
146         return dataStoreProperties;
147     }
148
149     public Duration getShardTransactionIdleTimeout() {
150         return shardTransactionIdleTimeout;
151     }
152
153     public String getDataStoreMXBeanType() {
154         return dataStoreMXBeanType;
155     }
156
157     public long getOperationTimeoutInMillis() {
158         return operationTimeoutInMillis;
159     }
160
161     public ConfigParams getShardRaftConfig() {
162         return raftConfig;
163     }
164
165     public int getShardTransactionCommitTimeoutInSeconds() {
166         return shardTransactionCommitTimeoutInSeconds;
167     }
168
169     public int getShardTransactionCommitQueueCapacity() {
170         return shardTransactionCommitQueueCapacity;
171     }
172
173     public Timeout getShardInitializationTimeout() {
174         return shardInitializationTimeout;
175     }
176
177     public Timeout getShardLeaderElectionTimeout() {
178         return shardLeaderElectionTimeout;
179     }
180
181     public boolean isPersistent() {
182         return persistent;
183     }
184
185     public AkkaConfigurationReader getConfigurationReader() {
186         return configurationReader;
187     }
188
189     public long getShardElectionTimeoutFactor() {
190         return raftConfig.getElectionTimeoutFactor();
191     }
192
193     public String getDataStoreName() {
194         return dataStoreName;
195     }
196
197     public LogicalDatastoreType getLogicalStoreType() {
198         return logicalStoreType;
199     }
200
201     public YangInstanceIdentifier getStoreRoot() {
202         return storeRoot;
203     }
204
205     public long getTransactionCreationInitialRateLimit() {
206         return transactionCreationInitialRateLimit;
207     }
208
209     public String getShardManagerPersistenceId() {
210         return shardManagerPersistenceId;
211     }
212
213     public String getTempFileDirectory() {
214         return raftConfig.getTempFileDirectory();
215     }
216
217     private void setTempFileDirectory(final String tempFileDirectory) {
218         raftConfig.setTempFileDirectory(tempFileDirectory);
219     }
220
221     public int getFileBackedStreamingThreshold() {
222         return raftConfig.getFileBackedStreamingThreshold();
223     }
224
225     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
226         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
227     }
228
229     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
230         raftConfig.setPeerAddressResolver(resolver);
231     }
232
233     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
234         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
235                 TimeUnit.MILLISECONDS));
236     }
237
238     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
239         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
240     }
241
242
243     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
244         raftConfig.setIsolatedLeaderCheckInterval(
245                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
246     }
247
248     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
249         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
250     }
251
252     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
253         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
254     }
255
256     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
257         Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
258                 && shardSnapshotDataThresholdPercentage <= 100);
259         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
260     }
261
262     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
263         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
264     }
265
266     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
267         raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
268     }
269
270     private void setSyncIndexThreshold(final long syncIndexThreshold) {
271         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
272     }
273
274     public int getShardBatchedModificationCount() {
275         return shardBatchedModificationCount;
276     }
277
278     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
279         return writeOnlyTransactionOptimizationsEnabled;
280     }
281
282     public long getShardCommitQueueExpiryTimeoutInMillis() {
283         return shardCommitQueueExpiryTimeoutInMillis;
284     }
285
286     public boolean isTransactionDebugContextEnabled() {
287         return transactionDebugContextEnabled;
288     }
289
290     public boolean isUseTellBasedProtocol() {
291         return useTellBasedProtocol;
292     }
293
294     public int getShardSnapshotChunkSize() {
295         return raftConfig.getSnapshotChunkSize();
296     }
297
298     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
299         private final DatastoreContext datastoreContext;
300         private int maxShardDataChangeExecutorPoolSize =
301                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
302         private int maxShardDataChangeExecutorQueueSize =
303                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
304         private int maxShardDataChangeListenerQueueSize =
305                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
306         private int maxShardDataStoreExecutorQueueSize =
307                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
308
309         private Builder(final DatastoreContext datastoreContext) {
310             this.datastoreContext = datastoreContext;
311
312             if (datastoreContext.getDataStoreProperties() != null) {
313                 maxShardDataChangeExecutorPoolSize =
314                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
315                 maxShardDataChangeExecutorQueueSize =
316                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
317                 maxShardDataChangeListenerQueueSize =
318                         datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
319                 maxShardDataStoreExecutorQueueSize =
320                         datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
321             }
322         }
323
324         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
325             // TODO - this is defined in the yang DataStoreProperties but not currently used.
326             return this;
327         }
328
329         public Builder enableMetricCapture(final boolean enableMetricCapture) {
330             // TODO - this is defined in the yang DataStoreProperties but not currently used.
331             return this;
332         }
333
334
335         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
336             datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
337             return this;
338         }
339
340         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
341             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
342         }
343
344         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
345             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
346             return this;
347         }
348
349         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
350             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
351             return this;
352         }
353
354         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
355             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
356             return this;
357         }
358
359         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
360             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
361             return this;
362         }
363
364         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
365             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
366             return this;
367         }
368
369         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
370             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
371             return this;
372         }
373
374         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
375             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
376             return this;
377         }
378
379         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
380             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
381             return this;
382         }
383
384         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
385             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
386             return this;
387         }
388
389         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
390             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
391             return this;
392         }
393
394         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
395             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
396         }
397
398         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
399             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
400             return this;
401         }
402
403         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
404             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
405         }
406
407         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
408             datastoreContext.configurationReader = configurationReader;
409             return this;
410         }
411
412         public Builder persistent(final boolean persistent) {
413             datastoreContext.persistent = persistent;
414             return this;
415         }
416
417         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
418             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
419             return this;
420         }
421
422         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
423             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
424             return this;
425         }
426
427         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
428             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
429             return this;
430         }
431
432         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
433             datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
434
435             // Retain compatible naming
436             switch (logicalStoreType) {
437                 case CONFIGURATION:
438                     dataStoreName("config");
439                     break;
440                 case OPERATIONAL:
441                     dataStoreName("operational");
442                     break;
443                 default:
444                     dataStoreName(logicalStoreType.name());
445             }
446
447             return this;
448         }
449
450         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
451             datastoreContext.storeRoot = storeRoot;
452             return this;
453         }
454
455         public Builder dataStoreName(final String dataStoreName) {
456             datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
457             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
458             return this;
459         }
460
461         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
462             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
463             return this;
464         }
465
466         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
467             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
468             return this;
469         }
470
471         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
472             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
473             return this;
474         }
475
476         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
477             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
478                     value, TimeUnit.SECONDS);
479             return this;
480         }
481
482         public Builder transactionDebugContextEnabled(final boolean value) {
483             datastoreContext.transactionDebugContextEnabled = value;
484             return this;
485         }
486
487         public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) {
488             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
489             return this;
490         }
491
492         public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) {
493             this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
494             return this;
495         }
496
497         public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) {
498             this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
499             return this;
500         }
501
502         public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) {
503             this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
504             return this;
505         }
506
507         public Builder useTellBasedProtocol(final boolean value) {
508             datastoreContext.useTellBasedProtocol = value;
509             return this;
510         }
511
512         /**
513          * For unit tests only.
514          */
515         @VisibleForTesting
516         public Builder shardManagerPersistenceId(final String id) {
517             datastoreContext.shardManagerPersistenceId = id;
518             return this;
519         }
520
521         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
522             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
523             return this;
524         }
525
526         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
527             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
528             return this;
529         }
530
531         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
532             datastoreContext.setPeerAddressResolver(resolver);
533             return this;
534         }
535
536         public Builder tempFileDirectory(final String tempFileDirectory) {
537             datastoreContext.setTempFileDirectory(tempFileDirectory);
538             return this;
539         }
540
541         public Builder fileBackedStreamingThresholdInMegabytes(final int  fileBackedStreamingThreshold) {
542             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
543             return this;
544         }
545
546         public Builder syncIndexThreshold(final long syncIndexThreshold) {
547             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
548             return this;
549         }
550
551         @Override
552         public DatastoreContext build() {
553             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
554                     maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
555                     maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
556
557             if (datastoreContext.dataStoreName != null) {
558                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
559             }
560
561             return datastoreContext;
562         }
563     }
564 }