890e1030674d3e7a3c68dbb8784a05aa82b7da1b
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.util.Timeout;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import java.util.Set;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.TimeUnit;
17 import org.apache.commons.lang3.text.WordUtils;
18 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
19 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
20 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
21 import org.opendaylight.controller.cluster.raft.ConfigParams;
22 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
23 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
24 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
25 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.duration.Duration;
30 import scala.concurrent.duration.FiniteDuration;
31
32 /**
33  * Contains contextual data for a data store.
34  *
35  * @author Thomas Pantelis
36  */
37 public class DatastoreContext implements ClientActorConfig {
38     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
39
40     public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
41     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
42     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
43     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
44     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
45     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
46     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
47             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
48     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
49     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
50     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
51     public static final boolean DEFAULT_PERSISTENT = true;
52     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
53     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
54     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
55     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
56     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
57     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
58     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
59             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
60     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
61
62     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
63
64     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
65
66     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
67
68     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
69
70     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
71     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
72     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
73     private String dataStoreMXBeanType;
74     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
75     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
76     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
77     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
78     private boolean persistent = DEFAULT_PERSISTENT;
79     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
80     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
81     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
82     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
83     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
84     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
85     private boolean writeOnlyTransactionOptimizationsEnabled = true;
86     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
87     private boolean useTellBasedProtocol = false;
88     private boolean transactionDebugContextEnabled = false;
89     private String shardManagerPersistenceId;
90     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
91
92     public static Set<String> getGlobalDatastoreNames() {
93         return GLOBAL_DATASTORE_NAMES;
94     }
95
96     private DatastoreContext() {
97         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
98         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
99         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
100         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
101         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
102         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
103         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
104         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
105     }
106
107     private DatastoreContext(final DatastoreContext other) {
108         this.dataStoreProperties = other.dataStoreProperties;
109         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
110         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
111         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
112         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
113         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
114         this.shardInitializationTimeout = other.shardInitializationTimeout;
115         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
116         this.persistent = other.persistent;
117         this.configurationReader = other.configurationReader;
118         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
119         this.dataStoreName = other.dataStoreName;
120         this.logicalStoreType = other.logicalStoreType;
121         this.storeRoot = other.storeRoot;
122         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
123         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
124         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
125         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
126         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
127         this.useTellBasedProtocol = other.useTellBasedProtocol;
128
129         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
130         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
131         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
132         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
133         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
134         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
135         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
136         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
137         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
138         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
139         setTempFileDirectory(other.getTempFileDirectory());
140         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
141         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
142     }
143
144     public static Builder newBuilder() {
145         return new Builder(new DatastoreContext());
146     }
147
148     public static Builder newBuilderFrom(final DatastoreContext context) {
149         return new Builder(new DatastoreContext(context));
150     }
151
152     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
153         return dataStoreProperties;
154     }
155
156     public Duration getShardTransactionIdleTimeout() {
157         return shardTransactionIdleTimeout;
158     }
159
160     public String getDataStoreMXBeanType() {
161         return dataStoreMXBeanType;
162     }
163
164     public long getOperationTimeoutInMillis() {
165         return operationTimeoutInMillis;
166     }
167
168     public ConfigParams getShardRaftConfig() {
169         return raftConfig;
170     }
171
172     public int getShardTransactionCommitTimeoutInSeconds() {
173         return shardTransactionCommitTimeoutInSeconds;
174     }
175
176     public int getShardTransactionCommitQueueCapacity() {
177         return shardTransactionCommitQueueCapacity;
178     }
179
180     public Timeout getShardInitializationTimeout() {
181         return shardInitializationTimeout;
182     }
183
184     public Timeout getShardLeaderElectionTimeout() {
185         return shardLeaderElectionTimeout;
186     }
187
188     public boolean isPersistent() {
189         return persistent;
190     }
191
192     public AkkaConfigurationReader getConfigurationReader() {
193         return configurationReader;
194     }
195
196     public long getShardElectionTimeoutFactor() {
197         return raftConfig.getElectionTimeoutFactor();
198     }
199
200     public String getDataStoreName() {
201         return dataStoreName;
202     }
203
204     public LogicalDatastoreType getLogicalStoreType() {
205         return logicalStoreType;
206     }
207
208     public YangInstanceIdentifier getStoreRoot() {
209         return storeRoot;
210     }
211
212     public long getTransactionCreationInitialRateLimit() {
213         return transactionCreationInitialRateLimit;
214     }
215
216     public String getShardManagerPersistenceId() {
217         return shardManagerPersistenceId;
218     }
219
220     @Override
221     public String getTempFileDirectory() {
222         return raftConfig.getTempFileDirectory();
223     }
224
225     private void setTempFileDirectory(final String tempFileDirectory) {
226         raftConfig.setTempFileDirectory(tempFileDirectory);
227     }
228
229     @Override
230     public int getFileBackedStreamingThreshold() {
231         return raftConfig.getFileBackedStreamingThreshold();
232     }
233
234     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
235         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
236     }
237
238     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
239         raftConfig.setPeerAddressResolver(resolver);
240     }
241
242     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
243         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
244                 TimeUnit.MILLISECONDS));
245     }
246
247     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
248         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
249     }
250
251
252     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
253         raftConfig.setIsolatedLeaderCheckInterval(
254                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
255     }
256
257     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
258         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
259     }
260
261     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
262         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
263     }
264
265     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
266         Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
267                 && shardSnapshotDataThresholdPercentage <= 100);
268         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
269     }
270
271     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
272         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
273     }
274
275     @Deprecated
276     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
277         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
278         // maximumMessageSliceSize.
279         if (shardSnapshotChunkSize < maximumMessageSliceSize) {
280             raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
281         }
282     }
283
284     private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
285         raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
286         this.maximumMessageSliceSize = maximumMessageSliceSize;
287     }
288
289     private void setSyncIndexThreshold(final long syncIndexThreshold) {
290         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
291     }
292
293     public int getShardBatchedModificationCount() {
294         return shardBatchedModificationCount;
295     }
296
297     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
298         return writeOnlyTransactionOptimizationsEnabled;
299     }
300
301     public long getShardCommitQueueExpiryTimeoutInMillis() {
302         return shardCommitQueueExpiryTimeoutInMillis;
303     }
304
305     public boolean isTransactionDebugContextEnabled() {
306         return transactionDebugContextEnabled;
307     }
308
309     public boolean isUseTellBasedProtocol() {
310         return useTellBasedProtocol;
311     }
312
313     @Override
314     public int getMaximumMessageSliceSize() {
315         return maximumMessageSliceSize;
316     }
317
318     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
319         private final DatastoreContext datastoreContext;
320         private int maxShardDataChangeExecutorPoolSize =
321                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
322         private int maxShardDataChangeExecutorQueueSize =
323                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
324         private int maxShardDataChangeListenerQueueSize =
325                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
326         private int maxShardDataStoreExecutorQueueSize =
327                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
328
329         private Builder(final DatastoreContext datastoreContext) {
330             this.datastoreContext = datastoreContext;
331
332             if (datastoreContext.getDataStoreProperties() != null) {
333                 maxShardDataChangeExecutorPoolSize =
334                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
335                 maxShardDataChangeExecutorQueueSize =
336                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
337                 maxShardDataChangeListenerQueueSize =
338                         datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
339                 maxShardDataStoreExecutorQueueSize =
340                         datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
341             }
342         }
343
344         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
345             // TODO - this is defined in the yang DataStoreProperties but not currently used.
346             return this;
347         }
348
349         public Builder enableMetricCapture(final boolean enableMetricCapture) {
350             // TODO - this is defined in the yang DataStoreProperties but not currently used.
351             return this;
352         }
353
354
355         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
356             datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
357             return this;
358         }
359
360         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
361             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
362         }
363
364         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
365             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
366             return this;
367         }
368
369         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
370             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
371             return this;
372         }
373
374         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
375             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
376             return this;
377         }
378
379         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
380             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
381             return this;
382         }
383
384         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
385             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
386             return this;
387         }
388
389         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
390             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
391             return this;
392         }
393
394         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
395             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
396             return this;
397         }
398
399         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
400             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
401             return this;
402         }
403
404         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
405             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
406             return this;
407         }
408
409         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
410             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
411             return this;
412         }
413
414         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
415             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
416         }
417
418         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
419             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
420             return this;
421         }
422
423         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
424             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
425         }
426
427         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
428             datastoreContext.configurationReader = configurationReader;
429             return this;
430         }
431
432         public Builder persistent(final boolean persistent) {
433             datastoreContext.persistent = persistent;
434             return this;
435         }
436
437         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
438             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
439             return this;
440         }
441
442         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
443             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
444             return this;
445         }
446
447         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
448             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
449             return this;
450         }
451
452         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
453             datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
454
455             // Retain compatible naming
456             switch (logicalStoreType) {
457                 case CONFIGURATION:
458                     dataStoreName("config");
459                     break;
460                 case OPERATIONAL:
461                     dataStoreName("operational");
462                     break;
463                 default:
464                     dataStoreName(logicalStoreType.name());
465             }
466
467             return this;
468         }
469
470         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
471             datastoreContext.storeRoot = storeRoot;
472             return this;
473         }
474
475         public Builder dataStoreName(final String dataStoreName) {
476             datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
477             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
478             return this;
479         }
480
481         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
482             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
483             return this;
484         }
485
486         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
487             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
488             return this;
489         }
490
491         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
492             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
493             return this;
494         }
495
496         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
497             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
498                     value, TimeUnit.SECONDS);
499             return this;
500         }
501
502         public Builder transactionDebugContextEnabled(final boolean value) {
503             datastoreContext.transactionDebugContextEnabled = value;
504             return this;
505         }
506
507         public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) {
508             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
509             return this;
510         }
511
512         public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) {
513             this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
514             return this;
515         }
516
517         public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) {
518             this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
519             return this;
520         }
521
522         public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) {
523             this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
524             return this;
525         }
526
527         public Builder useTellBasedProtocol(final boolean value) {
528             datastoreContext.useTellBasedProtocol = value;
529             return this;
530         }
531
532         /**
533          * For unit tests only.
534          */
535         @VisibleForTesting
536         public Builder shardManagerPersistenceId(final String id) {
537             datastoreContext.shardManagerPersistenceId = id;
538             return this;
539         }
540
541         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
542             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
543             return this;
544         }
545
546         @Deprecated
547         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
548             LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
549                     + "use maximum-message-slice-size instead");
550             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
551             return this;
552         }
553
554         public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
555             datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
556             return this;
557         }
558
559         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
560             datastoreContext.setPeerAddressResolver(resolver);
561             return this;
562         }
563
564         public Builder tempFileDirectory(final String tempFileDirectory) {
565             datastoreContext.setTempFileDirectory(tempFileDirectory);
566             return this;
567         }
568
569         public Builder fileBackedStreamingThresholdInMegabytes(final int  fileBackedStreamingThreshold) {
570             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
571             return this;
572         }
573
574         public Builder syncIndexThreshold(final long syncIndexThreshold) {
575             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
576             return this;
577         }
578
579         @Override
580         public DatastoreContext build() {
581             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
582                     maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
583                     maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
584
585             if (datastoreContext.dataStoreName != null) {
586                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
587             }
588
589             return datastoreContext;
590         }
591     }
592 }