BUG 2185 : Make the Custom Raft Policy externally configurable
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.util.Timeout;
12 import com.google.common.collect.Sets;
13 import java.util.Set;
14 import java.util.concurrent.TimeUnit;
15 import org.apache.commons.lang3.text.WordUtils;
16 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
17 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
18 import org.opendaylight.controller.cluster.raft.ConfigParams;
19 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
20 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
21 import scala.concurrent.duration.Duration;
22 import scala.concurrent.duration.FiniteDuration;
23
24 /**
25  * Contains contextual data for a data store.
26  *
27  * @author Thomas Pantelis
28  */
29 public class DatastoreContext {
30     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
31
32     public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
33     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
34     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
35     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000;
36     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
37     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
38     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
39     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
40     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
41     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
42     public static final boolean DEFAULT_PERSISTENT = true;
43     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
44     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
45     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
46     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
47     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
48     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
49     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
50
51     private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
52
53     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
54     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
55     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
56     private String dataStoreMXBeanType;
57     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
58     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
59     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
60     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
61     private boolean persistent = DEFAULT_PERSISTENT;
62     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
63     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
64     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
65     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
66     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
67     private boolean writeOnlyTransactionOptimizationsEnabled = true;
68     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
69     private boolean transactionDebugContextEnabled = false;
70     private String customRaftPolicyImplementation = "";
71
72     public static Set<String> getGlobalDatastoreTypes() {
73         return globalDatastoreTypes;
74     }
75
76     private DatastoreContext() {
77         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
78         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
79         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
80         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
81         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
82         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
83     }
84
85     private DatastoreContext(DatastoreContext other) {
86         this.dataStoreProperties = other.dataStoreProperties;
87         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
88         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
89         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
90         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
91         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
92         this.shardInitializationTimeout = other.shardInitializationTimeout;
93         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
94         this.persistent = other.persistent;
95         this.configurationReader = other.configurationReader;
96         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
97         this.dataStoreType = other.dataStoreType;
98         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
99         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
100         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
101         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
102         this.customRaftPolicyImplementation = other.customRaftPolicyImplementation;
103
104         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
105         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
106         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
107         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
108         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
109         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
110         setCustomRaftPolicyImplementation(other.customRaftPolicyImplementation);
111
112     }
113
114     public static Builder newBuilder() {
115         return new Builder(new DatastoreContext());
116     }
117
118     public static Builder newBuilderFrom(DatastoreContext context) {
119         return new Builder(new DatastoreContext(context));
120     }
121
122     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
123         return dataStoreProperties;
124     }
125
126     public Duration getShardTransactionIdleTimeout() {
127         return shardTransactionIdleTimeout;
128     }
129
130     public String getDataStoreMXBeanType() {
131         return dataStoreMXBeanType;
132     }
133
134     public long getOperationTimeoutInMillis() {
135         return operationTimeoutInMillis;
136     }
137
138     public ConfigParams getShardRaftConfig() {
139         return raftConfig;
140     }
141
142     public int getShardTransactionCommitTimeoutInSeconds() {
143         return shardTransactionCommitTimeoutInSeconds;
144     }
145
146     public int getShardTransactionCommitQueueCapacity() {
147         return shardTransactionCommitQueueCapacity;
148     }
149
150     public Timeout getShardInitializationTimeout() {
151         return shardInitializationTimeout;
152     }
153
154     public Timeout getShardLeaderElectionTimeout() {
155         return shardLeaderElectionTimeout;
156     }
157
158     public boolean isPersistent() {
159         return persistent;
160     }
161
162     public AkkaConfigurationReader getConfigurationReader() {
163         return configurationReader;
164     }
165
166     public long getShardElectionTimeoutFactor(){
167         return raftConfig.getElectionTimeoutFactor();
168     }
169
170     public String getDataStoreType(){
171         return dataStoreType;
172     }
173
174     public long getTransactionCreationInitialRateLimit() {
175         return transactionCreationInitialRateLimit;
176     }
177
178     private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
179         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
180                 TimeUnit.MILLISECONDS));
181     }
182
183     private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){
184         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
185     }
186
187
188     private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
189         raftConfig.setIsolatedLeaderCheckInterval(
190                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
191     }
192
193     private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
194         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
195     }
196
197     private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) {
198         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
199     }
200
201
202     private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
203         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
204     }
205
206     private void setSnapshotBatchCount(long shardSnapshotBatchCount) {
207         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
208     }
209
210     public int getShardBatchedModificationCount() {
211         return shardBatchedModificationCount;
212     }
213
214     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
215         return writeOnlyTransactionOptimizationsEnabled;
216     }
217
218     public long getShardCommitQueueExpiryTimeoutInMillis() {
219         return shardCommitQueueExpiryTimeoutInMillis;
220     }
221
222     public boolean isTransactionDebugContextEnabled() {
223         return transactionDebugContextEnabled;
224     }
225
226     public static class Builder {
227         private final DatastoreContext datastoreContext;
228         private int maxShardDataChangeExecutorPoolSize =
229                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
230         private int maxShardDataChangeExecutorQueueSize =
231                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
232         private int maxShardDataChangeListenerQueueSize =
233                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
234         private int maxShardDataStoreExecutorQueueSize =
235                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
236
237         private Builder(DatastoreContext datastoreContext) {
238             this.datastoreContext = datastoreContext;
239
240             if(datastoreContext.getDataStoreProperties() != null) {
241                 maxShardDataChangeExecutorPoolSize =
242                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
243                 maxShardDataChangeExecutorQueueSize =
244                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
245                 maxShardDataChangeListenerQueueSize =
246                         datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
247                 maxShardDataStoreExecutorQueueSize =
248                         datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
249             }
250         }
251
252         public Builder boundedMailboxCapacity(int boundedMailboxCapacity) {
253             // TODO - this is defined in the yang DataStoreProperties but not currently used.
254             return this;
255         }
256
257         public Builder enableMetricCapture(boolean enableMetricCapture) {
258             // TODO - this is defined in the yang DataStoreProperties but not currently used.
259             return this;
260         }
261
262
263         public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) {
264             datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
265             return this;
266         }
267
268         public Builder shardTransactionIdleTimeoutInMinutes(long timeout) {
269             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
270         }
271
272         public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
273             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
274             return this;
275         }
276
277         public Builder operationTimeoutInMillis(long operationTimeoutInMillis) {
278             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
279             return this;
280         }
281
282         public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
283             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
284             return this;
285         }
286
287         public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
288             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
289             return this;
290         }
291
292         public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
293             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
294             return this;
295         }
296
297         public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
298             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
299             return this;
300         }
301
302         public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
303             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
304             return this;
305         }
306
307         public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
308             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
309             return this;
310         }
311
312         public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
313             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
314             return this;
315         }
316
317         public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
318             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
319             return this;
320         }
321
322         public Builder shardInitializationTimeoutInSeconds(long timeout) {
323             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
324         }
325
326         public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
327             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
328             return this;
329         }
330
331         public Builder shardLeaderElectionTimeoutInSeconds(long timeout) {
332             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
333         }
334
335         public Builder configurationReader(AkkaConfigurationReader configurationReader){
336             datastoreContext.configurationReader = configurationReader;
337             return this;
338         }
339
340         public Builder persistent(boolean persistent){
341             datastoreContext.persistent = persistent;
342             return this;
343         }
344
345         public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
346             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
347             return this;
348         }
349
350         public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){
351             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
352             return this;
353         }
354
355         public Builder transactionCreationInitialRateLimit(long initialRateLimit){
356             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
357             return this;
358         }
359
360         public Builder dataStoreType(String dataStoreType){
361             datastoreContext.dataStoreType = dataStoreType;
362             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore";
363             return this;
364         }
365
366         public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
367             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
368             return this;
369         }
370
371         public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
372             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
373             return this;
374         }
375
376         public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
377             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
378             return this;
379         }
380
381         public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
382             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
383                     value, TimeUnit.SECONDS);
384             return this;
385         }
386
387         public Builder transactionDebugContextEnabled(boolean value) {
388             datastoreContext.transactionDebugContextEnabled = value;
389             return this;
390         }
391
392         public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
393             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
394             return this;
395         }
396
397         public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) {
398             this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
399             return this;
400         }
401
402         public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) {
403             this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
404             return this;
405         }
406
407         public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) {
408             this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
409             return this;
410         }
411
412         public DatastoreContext build() {
413             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
414                     maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
415                     maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
416
417             if(datastoreContext.dataStoreType != null) {
418                 globalDatastoreTypes.add(datastoreContext.dataStoreType);
419             }
420
421             return datastoreContext;
422         }
423
424         public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) {
425             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
426             return this;
427         }
428     }
429 }