Bug-4214 - Add support for configurable snapshot chunk size.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.util.Timeout;
12 import com.google.common.collect.Sets;
13 import java.util.Set;
14 import java.util.concurrent.TimeUnit;
15 import org.apache.commons.lang3.text.WordUtils;
16 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
17 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
18 import org.opendaylight.controller.cluster.raft.ConfigParams;
19 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
20 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
21 import scala.concurrent.duration.Duration;
22 import scala.concurrent.duration.FiniteDuration;
23
24 /**
25  * Contains contextual data for a data store.
26  *
27  * @author Thomas Pantelis
28  */
29 public class DatastoreContext {
30     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
31
32     public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
33     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
34     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
35     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000;
36     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
37     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
38     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
39     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
40     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
41     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
42     public static final boolean DEFAULT_PERSISTENT = true;
43     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
44     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
45     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
46     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
47     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
48     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
49     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
50     public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
51
52     private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
53
54     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
55     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
56     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
57     private String dataStoreMXBeanType;
58     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
59     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
60     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
61     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
62     private boolean persistent = DEFAULT_PERSISTENT;
63     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
64     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
65     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
66     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
67     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
68     private boolean writeOnlyTransactionOptimizationsEnabled = true;
69     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
70     private boolean transactionDebugContextEnabled = false;
71     private String customRaftPolicyImplementation = "";
72
73     public static Set<String> getGlobalDatastoreTypes() {
74         return globalDatastoreTypes;
75     }
76
77     private DatastoreContext() {
78         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
79         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
80         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
81         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
82         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
83         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
84         setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
85     }
86
87     private DatastoreContext(DatastoreContext other) {
88         this.dataStoreProperties = other.dataStoreProperties;
89         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
90         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
91         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
92         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
93         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
94         this.shardInitializationTimeout = other.shardInitializationTimeout;
95         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
96         this.persistent = other.persistent;
97         this.configurationReader = other.configurationReader;
98         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
99         this.dataStoreType = other.dataStoreType;
100         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
101         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
102         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
103         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
104         this.customRaftPolicyImplementation = other.customRaftPolicyImplementation;
105
106         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
107         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
108         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
109         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
110         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
111         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
112         setCustomRaftPolicyImplementation(other.customRaftPolicyImplementation);
113         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
114
115     }
116
117     public static Builder newBuilder() {
118         return new Builder(new DatastoreContext());
119     }
120
121     public static Builder newBuilderFrom(DatastoreContext context) {
122         return new Builder(new DatastoreContext(context));
123     }
124
125     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
126         return dataStoreProperties;
127     }
128
129     public Duration getShardTransactionIdleTimeout() {
130         return shardTransactionIdleTimeout;
131     }
132
133     public String getDataStoreMXBeanType() {
134         return dataStoreMXBeanType;
135     }
136
137     public long getOperationTimeoutInMillis() {
138         return operationTimeoutInMillis;
139     }
140
141     public ConfigParams getShardRaftConfig() {
142         return raftConfig;
143     }
144
145     public int getShardTransactionCommitTimeoutInSeconds() {
146         return shardTransactionCommitTimeoutInSeconds;
147     }
148
149     public int getShardTransactionCommitQueueCapacity() {
150         return shardTransactionCommitQueueCapacity;
151     }
152
153     public Timeout getShardInitializationTimeout() {
154         return shardInitializationTimeout;
155     }
156
157     public Timeout getShardLeaderElectionTimeout() {
158         return shardLeaderElectionTimeout;
159     }
160
161     public boolean isPersistent() {
162         return persistent;
163     }
164
165     public AkkaConfigurationReader getConfigurationReader() {
166         return configurationReader;
167     }
168
169     public long getShardElectionTimeoutFactor(){
170         return raftConfig.getElectionTimeoutFactor();
171     }
172
173     public String getDataStoreType(){
174         return dataStoreType;
175     }
176
177     public long getTransactionCreationInitialRateLimit() {
178         return transactionCreationInitialRateLimit;
179     }
180
181     private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
182         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
183                 TimeUnit.MILLISECONDS));
184     }
185
186     private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){
187         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
188     }
189
190
191     private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
192         raftConfig.setIsolatedLeaderCheckInterval(
193                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
194     }
195
196     private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
197         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
198     }
199
200     private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) {
201         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
202     }
203
204
205     private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
206         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
207     }
208
209     private void setSnapshotBatchCount(long shardSnapshotBatchCount) {
210         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
211     }
212
213     public int getShardBatchedModificationCount() {
214         return shardBatchedModificationCount;
215     }
216
217     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
218         return writeOnlyTransactionOptimizationsEnabled;
219     }
220
221     public long getShardCommitQueueExpiryTimeoutInMillis() {
222         return shardCommitQueueExpiryTimeoutInMillis;
223     }
224
225     public boolean isTransactionDebugContextEnabled() {
226         return transactionDebugContextEnabled;
227     }
228
229     public int getShardSnapshotChunkSize() {
230         return raftConfig.getSnapshotChunkSize();
231     }
232
233     public void setShardSnapshotChunkSize(int shardSnapshotChunkSize) {
234         raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
235     }
236
237     public static class Builder {
238         private final DatastoreContext datastoreContext;
239         private int maxShardDataChangeExecutorPoolSize =
240                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
241         private int maxShardDataChangeExecutorQueueSize =
242                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
243         private int maxShardDataChangeListenerQueueSize =
244                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
245         private int maxShardDataStoreExecutorQueueSize =
246                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
247
248         private Builder(DatastoreContext datastoreContext) {
249             this.datastoreContext = datastoreContext;
250
251             if(datastoreContext.getDataStoreProperties() != null) {
252                 maxShardDataChangeExecutorPoolSize =
253                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
254                 maxShardDataChangeExecutorQueueSize =
255                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
256                 maxShardDataChangeListenerQueueSize =
257                         datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
258                 maxShardDataStoreExecutorQueueSize =
259                         datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
260             }
261         }
262
263         public Builder boundedMailboxCapacity(int boundedMailboxCapacity) {
264             // TODO - this is defined in the yang DataStoreProperties but not currently used.
265             return this;
266         }
267
268         public Builder enableMetricCapture(boolean enableMetricCapture) {
269             // TODO - this is defined in the yang DataStoreProperties but not currently used.
270             return this;
271         }
272
273
274         public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) {
275             datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
276             return this;
277         }
278
279         public Builder shardTransactionIdleTimeoutInMinutes(long timeout) {
280             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
281         }
282
283         public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
284             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
285             return this;
286         }
287
288         public Builder operationTimeoutInMillis(long operationTimeoutInMillis) {
289             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
290             return this;
291         }
292
293         public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
294             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
295             return this;
296         }
297
298         public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
299             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
300             return this;
301         }
302
303         public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
304             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
305             return this;
306         }
307
308         public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
309             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
310             return this;
311         }
312
313         public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
314             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
315             return this;
316         }
317
318         public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
319             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
320             return this;
321         }
322
323         public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
324             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
325             return this;
326         }
327
328         public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
329             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
330             return this;
331         }
332
333         public Builder shardInitializationTimeoutInSeconds(long timeout) {
334             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
335         }
336
337         public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
338             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
339             return this;
340         }
341
342         public Builder shardLeaderElectionTimeoutInSeconds(long timeout) {
343             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
344         }
345
346         public Builder configurationReader(AkkaConfigurationReader configurationReader){
347             datastoreContext.configurationReader = configurationReader;
348             return this;
349         }
350
351         public Builder persistent(boolean persistent){
352             datastoreContext.persistent = persistent;
353             return this;
354         }
355
356         public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
357             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
358             return this;
359         }
360
361         public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){
362             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
363             return this;
364         }
365
366         public Builder transactionCreationInitialRateLimit(long initialRateLimit){
367             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
368             return this;
369         }
370
371         public Builder dataStoreType(String dataStoreType){
372             datastoreContext.dataStoreType = dataStoreType;
373             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore";
374             return this;
375         }
376
377         public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
378             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
379             return this;
380         }
381
382         public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
383             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
384             return this;
385         }
386
387         public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
388             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
389             return this;
390         }
391
392         public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
393             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
394                     value, TimeUnit.SECONDS);
395             return this;
396         }
397
398         public Builder transactionDebugContextEnabled(boolean value) {
399             datastoreContext.transactionDebugContextEnabled = value;
400             return this;
401         }
402
403         public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
404             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
405             return this;
406         }
407
408         public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) {
409             this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
410             return this;
411         }
412
413         public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) {
414             this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
415             return this;
416         }
417
418         public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) {
419             this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
420             return this;
421         }
422
423         public DatastoreContext build() {
424             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
425                     maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
426                     maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
427
428             if(datastoreContext.dataStoreType != null) {
429                 globalDatastoreTypes.add(datastoreContext.dataStoreType);
430             }
431
432             return datastoreContext;
433         }
434
435         public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) {
436             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
437             return this;
438         }
439
440         public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) {
441             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
442             return this;
443         }
444     }
445 }