Bug 7449: Add maximum-message-slice-size config param
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.util.Timeout;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import java.util.Set;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.TimeUnit;
17 import org.apache.commons.lang3.text.WordUtils;
18 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
19 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
20 import org.opendaylight.controller.cluster.raft.ConfigParams;
21 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
22 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.duration.Duration;
29 import scala.concurrent.duration.FiniteDuration;
30
31 /**
32  * Contains contextual data for a data store.
33  *
34  * @author Thomas Pantelis
35  */
36 public class DatastoreContext {
37     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
38
39     public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
40     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
41     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
42     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
43     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
44     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
45     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
46             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
47     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
48     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
49     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
50     public static final boolean DEFAULT_PERSISTENT = true;
51     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
52     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
53     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
54     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
55     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
56     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
57     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
58             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
59     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
60
61     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
62
63     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
64
65     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
66
67     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
68
69     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
70     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
71     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
72     private String dataStoreMXBeanType;
73     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
74     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
75     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
76     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
77     private boolean persistent = DEFAULT_PERSISTENT;
78     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
79     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
80     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
81     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
82     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
83     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
84     private boolean writeOnlyTransactionOptimizationsEnabled = true;
85     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
86     private boolean useTellBasedProtocol = false;
87     private boolean transactionDebugContextEnabled = false;
88     private String shardManagerPersistenceId;
89     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
90
91     public static Set<String> getGlobalDatastoreNames() {
92         return GLOBAL_DATASTORE_NAMES;
93     }
94
95     private DatastoreContext() {
96         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
97         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
98         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
99         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
100         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
101         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
102         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
103         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
104     }
105
106     private DatastoreContext(final DatastoreContext other) {
107         this.dataStoreProperties = other.dataStoreProperties;
108         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
109         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
110         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
111         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
112         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
113         this.shardInitializationTimeout = other.shardInitializationTimeout;
114         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
115         this.persistent = other.persistent;
116         this.configurationReader = other.configurationReader;
117         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
118         this.dataStoreName = other.dataStoreName;
119         this.logicalStoreType = other.logicalStoreType;
120         this.storeRoot = other.storeRoot;
121         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
122         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
123         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
124         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
125         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
126         this.useTellBasedProtocol = other.useTellBasedProtocol;
127
128         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
129         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
130         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
131         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
132         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
133         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
134         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
135         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
136         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
137         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
138         setTempFileDirectory(other.getTempFileDirectory());
139         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
140         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
141     }
142
143     public static Builder newBuilder() {
144         return new Builder(new DatastoreContext());
145     }
146
147     public static Builder newBuilderFrom(final DatastoreContext context) {
148         return new Builder(new DatastoreContext(context));
149     }
150
151     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
152         return dataStoreProperties;
153     }
154
155     public Duration getShardTransactionIdleTimeout() {
156         return shardTransactionIdleTimeout;
157     }
158
159     public String getDataStoreMXBeanType() {
160         return dataStoreMXBeanType;
161     }
162
163     public long getOperationTimeoutInMillis() {
164         return operationTimeoutInMillis;
165     }
166
167     public ConfigParams getShardRaftConfig() {
168         return raftConfig;
169     }
170
171     public int getShardTransactionCommitTimeoutInSeconds() {
172         return shardTransactionCommitTimeoutInSeconds;
173     }
174
175     public int getShardTransactionCommitQueueCapacity() {
176         return shardTransactionCommitQueueCapacity;
177     }
178
179     public Timeout getShardInitializationTimeout() {
180         return shardInitializationTimeout;
181     }
182
183     public Timeout getShardLeaderElectionTimeout() {
184         return shardLeaderElectionTimeout;
185     }
186
187     public boolean isPersistent() {
188         return persistent;
189     }
190
191     public AkkaConfigurationReader getConfigurationReader() {
192         return configurationReader;
193     }
194
195     public long getShardElectionTimeoutFactor() {
196         return raftConfig.getElectionTimeoutFactor();
197     }
198
199     public String getDataStoreName() {
200         return dataStoreName;
201     }
202
203     public LogicalDatastoreType getLogicalStoreType() {
204         return logicalStoreType;
205     }
206
207     public YangInstanceIdentifier getStoreRoot() {
208         return storeRoot;
209     }
210
211     public long getTransactionCreationInitialRateLimit() {
212         return transactionCreationInitialRateLimit;
213     }
214
215     public String getShardManagerPersistenceId() {
216         return shardManagerPersistenceId;
217     }
218
219     public String getTempFileDirectory() {
220         return raftConfig.getTempFileDirectory();
221     }
222
223     private void setTempFileDirectory(final String tempFileDirectory) {
224         raftConfig.setTempFileDirectory(tempFileDirectory);
225     }
226
227     public int getFileBackedStreamingThreshold() {
228         return raftConfig.getFileBackedStreamingThreshold();
229     }
230
231     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
232         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
233     }
234
235     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
236         raftConfig.setPeerAddressResolver(resolver);
237     }
238
239     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
240         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
241                 TimeUnit.MILLISECONDS));
242     }
243
244     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
245         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
246     }
247
248
249     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
250         raftConfig.setIsolatedLeaderCheckInterval(
251                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
252     }
253
254     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
255         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
256     }
257
258     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
259         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
260     }
261
262     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
263         Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
264                 && shardSnapshotDataThresholdPercentage <= 100);
265         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
266     }
267
268     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
269         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
270     }
271
272     @Deprecated
273     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
274         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
275         // maximumMessageSliceSize.
276         if (shardSnapshotChunkSize < maximumMessageSliceSize) {
277             raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
278         }
279     }
280
281     private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
282         raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
283         this.maximumMessageSliceSize = maximumMessageSliceSize;
284     }
285
286     private void setSyncIndexThreshold(final long syncIndexThreshold) {
287         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
288     }
289
290     public int getShardBatchedModificationCount() {
291         return shardBatchedModificationCount;
292     }
293
294     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
295         return writeOnlyTransactionOptimizationsEnabled;
296     }
297
298     public long getShardCommitQueueExpiryTimeoutInMillis() {
299         return shardCommitQueueExpiryTimeoutInMillis;
300     }
301
302     public boolean isTransactionDebugContextEnabled() {
303         return transactionDebugContextEnabled;
304     }
305
306     public boolean isUseTellBasedProtocol() {
307         return useTellBasedProtocol;
308     }
309
310     public int getMaximumMessageSliceSize() {
311         return maximumMessageSliceSize;
312     }
313
314     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
315         private final DatastoreContext datastoreContext;
316         private int maxShardDataChangeExecutorPoolSize =
317                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
318         private int maxShardDataChangeExecutorQueueSize =
319                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
320         private int maxShardDataChangeListenerQueueSize =
321                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
322         private int maxShardDataStoreExecutorQueueSize =
323                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
324
325         private Builder(final DatastoreContext datastoreContext) {
326             this.datastoreContext = datastoreContext;
327
328             if (datastoreContext.getDataStoreProperties() != null) {
329                 maxShardDataChangeExecutorPoolSize =
330                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
331                 maxShardDataChangeExecutorQueueSize =
332                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
333                 maxShardDataChangeListenerQueueSize =
334                         datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
335                 maxShardDataStoreExecutorQueueSize =
336                         datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
337             }
338         }
339
340         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
341             // TODO - this is defined in the yang DataStoreProperties but not currently used.
342             return this;
343         }
344
345         public Builder enableMetricCapture(final boolean enableMetricCapture) {
346             // TODO - this is defined in the yang DataStoreProperties but not currently used.
347             return this;
348         }
349
350
351         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
352             datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
353             return this;
354         }
355
356         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
357             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
358         }
359
360         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
361             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
362             return this;
363         }
364
365         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
366             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
367             return this;
368         }
369
370         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
371             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
372             return this;
373         }
374
375         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
376             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
377             return this;
378         }
379
380         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
381             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
382             return this;
383         }
384
385         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
386             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
387             return this;
388         }
389
390         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
391             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
392             return this;
393         }
394
395         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
396             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
397             return this;
398         }
399
400         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
401             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
402             return this;
403         }
404
405         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
406             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
407             return this;
408         }
409
410         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
411             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
412         }
413
414         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
415             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
416             return this;
417         }
418
419         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
420             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
421         }
422
423         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
424             datastoreContext.configurationReader = configurationReader;
425             return this;
426         }
427
428         public Builder persistent(final boolean persistent) {
429             datastoreContext.persistent = persistent;
430             return this;
431         }
432
433         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
434             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
435             return this;
436         }
437
438         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
439             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
440             return this;
441         }
442
443         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
444             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
445             return this;
446         }
447
448         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
449             datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
450
451             // Retain compatible naming
452             switch (logicalStoreType) {
453                 case CONFIGURATION:
454                     dataStoreName("config");
455                     break;
456                 case OPERATIONAL:
457                     dataStoreName("operational");
458                     break;
459                 default:
460                     dataStoreName(logicalStoreType.name());
461             }
462
463             return this;
464         }
465
466         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
467             datastoreContext.storeRoot = storeRoot;
468             return this;
469         }
470
471         public Builder dataStoreName(final String dataStoreName) {
472             datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
473             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
474             return this;
475         }
476
477         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
478             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
479             return this;
480         }
481
482         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
483             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
484             return this;
485         }
486
487         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
488             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
489             return this;
490         }
491
492         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
493             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
494                     value, TimeUnit.SECONDS);
495             return this;
496         }
497
498         public Builder transactionDebugContextEnabled(final boolean value) {
499             datastoreContext.transactionDebugContextEnabled = value;
500             return this;
501         }
502
503         public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) {
504             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
505             return this;
506         }
507
508         public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) {
509             this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
510             return this;
511         }
512
513         public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) {
514             this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
515             return this;
516         }
517
518         public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) {
519             this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
520             return this;
521         }
522
523         public Builder useTellBasedProtocol(final boolean value) {
524             datastoreContext.useTellBasedProtocol = value;
525             return this;
526         }
527
528         /**
529          * For unit tests only.
530          */
531         @VisibleForTesting
532         public Builder shardManagerPersistenceId(final String id) {
533             datastoreContext.shardManagerPersistenceId = id;
534             return this;
535         }
536
537         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
538             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
539             return this;
540         }
541
542         @Deprecated
543         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
544             LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
545                     + "use maximum-message-slice-size instead");
546             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
547             return this;
548         }
549
550         public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
551             datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
552             return this;
553         }
554
555         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
556             datastoreContext.setPeerAddressResolver(resolver);
557             return this;
558         }
559
560         public Builder tempFileDirectory(final String tempFileDirectory) {
561             datastoreContext.setTempFileDirectory(tempFileDirectory);
562             return this;
563         }
564
565         public Builder fileBackedStreamingThresholdInMegabytes(final int  fileBackedStreamingThreshold) {
566             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
567             return this;
568         }
569
570         public Builder syncIndexThreshold(final long syncIndexThreshold) {
571             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
572             return this;
573         }
574
575         @Override
576         public DatastoreContext build() {
577             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
578                     maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
579                     maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
580
581             if (datastoreContext.dataStoreName != null) {
582                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
583             }
584
585             return datastoreContext;
586         }
587     }
588 }