98afd7f4e02fc76354fd22765e75ed2ba8b98123
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.util.Timeout;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.TimeUnit;
16 import org.apache.commons.text.WordUtils;
17 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
18 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
19 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
20 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
21 import org.opendaylight.controller.cluster.raft.ConfigParams;
22 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
23 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
24 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
25 import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStoreConfigProperties;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.duration.FiniteDuration;
30
31 /**
32  * Contains contextual data for a data store.
33  *
34  * @author Thomas Pantelis
35  */
36 // Non-final for mocking
37 public class DatastoreContext implements ClientActorConfig {
38     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
39
40     public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
41         TimeUnit.MINUTES);
42     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
43     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
44     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
45     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
46     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
47     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
48             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
49     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
50     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
51     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
52     public static final boolean DEFAULT_PERSISTENT = true;
53     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
54     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
55     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
56     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
57     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
58     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
59     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
60             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
61     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
62     public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
63
64     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
65
66     private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
67
68     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
69
70     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
71
72     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
73     private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
74     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
75     private String dataStoreMXBeanType;
76     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
77     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
78     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
79     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
80     private boolean persistent = DEFAULT_PERSISTENT;
81     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
82     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
83     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
84     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
85     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
86     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
87     private boolean writeOnlyTransactionOptimizationsEnabled = true;
88     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
89     private boolean useTellBasedProtocol = false;
90     private boolean transactionDebugContextEnabled = false;
91     private String shardManagerPersistenceId;
92     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
93     private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
94     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
95     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
96     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
97
98     public static Set<String> getGlobalDatastoreNames() {
99         return GLOBAL_DATASTORE_NAMES;
100     }
101
102     DatastoreContext() {
103         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
104         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
105         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
106         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
107         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
108         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
109         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
110         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
111     }
112
113     private DatastoreContext(final DatastoreContext other) {
114         this.dataStoreProperties = other.dataStoreProperties;
115         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
116         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
117         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
118         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
119         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
120         this.shardInitializationTimeout = other.shardInitializationTimeout;
121         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
122         this.persistent = other.persistent;
123         this.configurationReader = other.configurationReader;
124         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
125         this.dataStoreName = other.dataStoreName;
126         this.logicalStoreType = other.logicalStoreType;
127         this.storeRoot = other.storeRoot;
128         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
129         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
130         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
131         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
132         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
133         this.useTellBasedProtocol = other.useTellBasedProtocol;
134         this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
135         this.requestTimeout = other.requestTimeout;
136         this.noProgressTimeout = other.noProgressTimeout;
137         this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
138
139         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
140         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
141         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
142         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
143         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
144         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
145         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
146         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
147         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
148         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
149         setTempFileDirectory(other.getTempFileDirectory());
150         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
151         setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
152     }
153
154     public static Builder newBuilder() {
155         return new Builder(new DatastoreContext());
156     }
157
158     public static Builder newBuilderFrom(final DatastoreContext context) {
159         return new Builder(new DatastoreContext(context));
160     }
161
162     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
163         return dataStoreProperties;
164     }
165
166     public FiniteDuration getShardTransactionIdleTimeout() {
167         return shardTransactionIdleTimeout;
168     }
169
170     public String getDataStoreMXBeanType() {
171         return dataStoreMXBeanType;
172     }
173
174     public long getOperationTimeoutInMillis() {
175         return operationTimeoutInMillis;
176     }
177
178     public ConfigParams getShardRaftConfig() {
179         return raftConfig;
180     }
181
182     public int getShardTransactionCommitTimeoutInSeconds() {
183         return shardTransactionCommitTimeoutInSeconds;
184     }
185
186     public int getShardTransactionCommitQueueCapacity() {
187         return shardTransactionCommitQueueCapacity;
188     }
189
190     public Timeout getShardInitializationTimeout() {
191         return shardInitializationTimeout;
192     }
193
194     public Timeout getShardLeaderElectionTimeout() {
195         return shardLeaderElectionTimeout;
196     }
197
198     public boolean isPersistent() {
199         return persistent;
200     }
201
202     public AkkaConfigurationReader getConfigurationReader() {
203         return configurationReader;
204     }
205
206     public long getShardElectionTimeoutFactor() {
207         return raftConfig.getElectionTimeoutFactor();
208     }
209
210     public String getDataStoreName() {
211         return dataStoreName;
212     }
213
214     public LogicalDatastoreType getLogicalStoreType() {
215         return logicalStoreType;
216     }
217
218     public YangInstanceIdentifier getStoreRoot() {
219         return storeRoot;
220     }
221
222     public long getTransactionCreationInitialRateLimit() {
223         return transactionCreationInitialRateLimit;
224     }
225
226     public String getShardManagerPersistenceId() {
227         return shardManagerPersistenceId;
228     }
229
230     @Override
231     public String getTempFileDirectory() {
232         return raftConfig.getTempFileDirectory();
233     }
234
235     private void setTempFileDirectory(final String tempFileDirectory) {
236         raftConfig.setTempFileDirectory(tempFileDirectory);
237     }
238
239     @Override
240     public int getFileBackedStreamingThreshold() {
241         return raftConfig.getFileBackedStreamingThreshold();
242     }
243
244     private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
245         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
246     }
247
248     private void setPeerAddressResolver(final PeerAddressResolver resolver) {
249         raftConfig.setPeerAddressResolver(resolver);
250     }
251
252     private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
253         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
254                 TimeUnit.MILLISECONDS));
255     }
256
257     private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
258         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
259     }
260
261
262     private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
263         raftConfig.setIsolatedLeaderCheckInterval(
264                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
265     }
266
267     private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
268         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
269     }
270
271     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
272         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
273     }
274
275     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
276         Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
277                 && shardSnapshotDataThresholdPercentage <= 100);
278         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
279     }
280
281     private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
282         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
283     }
284
285     @Deprecated
286     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
287         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
288         // maximumMessageSliceSize.
289         if (shardSnapshotChunkSize < maximumMessageSliceSize) {
290             raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
291         }
292     }
293
294     private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
295         raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
296         this.maximumMessageSliceSize = maximumMessageSliceSize;
297     }
298
299     private void setSyncIndexThreshold(final long syncIndexThreshold) {
300         raftConfig.setSyncIndexThreshold(syncIndexThreshold);
301     }
302
303     public int getShardBatchedModificationCount() {
304         return shardBatchedModificationCount;
305     }
306
307     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
308         return writeOnlyTransactionOptimizationsEnabled;
309     }
310
311     public long getShardCommitQueueExpiryTimeoutInMillis() {
312         return shardCommitQueueExpiryTimeoutInMillis;
313     }
314
315     public boolean isTransactionDebugContextEnabled() {
316         return transactionDebugContextEnabled;
317     }
318
319     public boolean isUseTellBasedProtocol() {
320         return useTellBasedProtocol;
321     }
322
323     @Override
324     public int getMaximumMessageSliceSize() {
325         return maximumMessageSliceSize;
326     }
327
328     @Override
329     public long getBackendAlivenessTimerInterval() {
330         return backendAlivenessTimerInterval;
331     }
332
333     @Override
334     public long getRequestTimeout() {
335         return requestTimeout;
336     }
337
338     @Override
339     public long getNoProgressTimeout() {
340         return noProgressTimeout;
341     }
342
343     public int getInitialPayloadSerializedBufferCapacity() {
344         return initialPayloadSerializedBufferCapacity;
345     }
346
347     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
348         private final DatastoreContext datastoreContext;
349         private int maxShardDataChangeExecutorPoolSize =
350                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
351         private int maxShardDataChangeExecutorQueueSize =
352                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
353         private int maxShardDataChangeListenerQueueSize =
354                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
355         private int maxShardDataStoreExecutorQueueSize =
356                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
357
358         Builder(final DatastoreContext datastoreContext) {
359             this.datastoreContext = datastoreContext;
360
361             if (datastoreContext.getDataStoreProperties() != null) {
362                 maxShardDataChangeExecutorPoolSize =
363                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
364                 maxShardDataChangeExecutorQueueSize =
365                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
366                 maxShardDataChangeListenerQueueSize =
367                         datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
368                 maxShardDataStoreExecutorQueueSize =
369                         datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
370             }
371         }
372
373         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
374             // TODO - this is defined in the yang DataStoreProperties but not currently used.
375             return this;
376         }
377
378         public Builder enableMetricCapture(final boolean enableMetricCapture) {
379             // TODO - this is defined in the yang DataStoreProperties but not currently used.
380             return this;
381         }
382
383
384         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
385             datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
386             return this;
387         }
388
389         public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
390             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
391         }
392
393         public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
394             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
395             return this;
396         }
397
398         public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
399             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
400             return this;
401         }
402
403         public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
404             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
405             return this;
406         }
407
408         public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
409             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
410             return this;
411         }
412
413         public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
414             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
415             return this;
416         }
417
418         public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
419             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
420             return this;
421         }
422
423         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
424             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
425             return this;
426         }
427
428         public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
429             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
430             return this;
431         }
432
433         public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
434             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
435             return this;
436         }
437
438         public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
439             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
440             return this;
441         }
442
443         public Builder shardInitializationTimeoutInSeconds(final long timeout) {
444             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
445         }
446
447         public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
448             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
449             return this;
450         }
451
452         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
453             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
454         }
455
456         public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
457             datastoreContext.configurationReader = configurationReader;
458             return this;
459         }
460
461         public Builder persistent(final boolean persistent) {
462             datastoreContext.persistent = persistent;
463             return this;
464         }
465
466         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
467             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
468             return this;
469         }
470
471         public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
472             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
473             return this;
474         }
475
476         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
477             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
478             return this;
479         }
480
481         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
482             datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
483
484             // Retain compatible naming
485             switch (logicalStoreType) {
486                 case CONFIGURATION:
487                     dataStoreName("config");
488                     break;
489                 case OPERATIONAL:
490                     dataStoreName("operational");
491                     break;
492                 default:
493                     dataStoreName(logicalStoreType.name());
494             }
495
496             return this;
497         }
498
499         public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
500             datastoreContext.storeRoot = storeRoot;
501             return this;
502         }
503
504         public Builder dataStoreName(final String dataStoreName) {
505             datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
506             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
507             return this;
508         }
509
510         public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
511             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
512             return this;
513         }
514
515         public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
516             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
517             return this;
518         }
519
520         public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
521             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
522             return this;
523         }
524
525         public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
526             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
527                     value, TimeUnit.SECONDS);
528             return this;
529         }
530
531         public Builder transactionDebugContextEnabled(final boolean value) {
532             datastoreContext.transactionDebugContextEnabled = value;
533             return this;
534         }
535
536         public Builder maxShardDataChangeExecutorPoolSize(final int newMaxShardDataChangeExecutorPoolSize) {
537             this.maxShardDataChangeExecutorPoolSize = newMaxShardDataChangeExecutorPoolSize;
538             return this;
539         }
540
541         public Builder maxShardDataChangeExecutorQueueSize(final int newMaxShardDataChangeExecutorQueueSize) {
542             this.maxShardDataChangeExecutorQueueSize = newMaxShardDataChangeExecutorQueueSize;
543             return this;
544         }
545
546         public Builder maxShardDataChangeListenerQueueSize(final int newMaxShardDataChangeListenerQueueSize) {
547             this.maxShardDataChangeListenerQueueSize = newMaxShardDataChangeListenerQueueSize;
548             return this;
549         }
550
551         public Builder maxShardDataStoreExecutorQueueSize(final int newMaxShardDataStoreExecutorQueueSize) {
552             this.maxShardDataStoreExecutorQueueSize = newMaxShardDataStoreExecutorQueueSize;
553             return this;
554         }
555
556         public Builder useTellBasedProtocol(final boolean value) {
557             datastoreContext.useTellBasedProtocol = value;
558             return this;
559         }
560
561         /**
562          * For unit tests only.
563          */
564         @VisibleForTesting
565         public Builder shardManagerPersistenceId(final String id) {
566             datastoreContext.shardManagerPersistenceId = id;
567             return this;
568         }
569
570         public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
571             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
572             return this;
573         }
574
575         @Deprecated
576         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
577             LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
578                     + "use maximum-message-slice-size instead");
579             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
580             return this;
581         }
582
583         public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
584             datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
585             return this;
586         }
587
588         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
589             datastoreContext.setPeerAddressResolver(resolver);
590             return this;
591         }
592
593         public Builder tempFileDirectory(final String tempFileDirectory) {
594             datastoreContext.setTempFileDirectory(tempFileDirectory);
595             return this;
596         }
597
598         public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
599             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
600             return this;
601         }
602
603         public Builder syncIndexThreshold(final long syncIndexThreshold) {
604             datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
605             return this;
606         }
607
608         public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
609             datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
610             return this;
611         }
612
613         public Builder frontendRequestTimeoutInSeconds(final long timeout) {
614             datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
615             return this;
616         }
617
618         public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
619             datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
620             return this;
621         }
622
623         public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
624             datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
625             return this;
626         }
627
628         @Override
629         public DatastoreContext build() {
630             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.builder()
631                     .maxDataChangeExecutorPoolSize(maxShardDataChangeExecutorPoolSize)
632                     .maxDataChangeExecutorQueueSize(maxShardDataChangeExecutorQueueSize)
633                     .maxDataChangeListenerQueueSize(maxShardDataChangeListenerQueueSize)
634                     .maxDataStoreExecutorQueueSize(maxShardDataStoreExecutorQueueSize)
635                     .build();
636
637             if (datastoreContext.dataStoreName != null) {
638                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
639             }
640
641             return datastoreContext;
642         }
643     }
644 }