Fix DistributedDataStoreIntegrationTest failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.util.Timeout;
12 import com.google.common.collect.Sets;
13 import java.util.Set;
14 import java.util.concurrent.TimeUnit;
15 import org.apache.commons.lang3.text.WordUtils;
16 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
17 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
18 import org.opendaylight.controller.cluster.raft.ConfigParams;
19 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
20 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
21 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
22 import scala.concurrent.duration.Duration;
23 import scala.concurrent.duration.FiniteDuration;
24
25 /**
26  * Contains contextual data for a data store.
27  *
28  * @author Thomas Pantelis
29  */
30 public class DatastoreContext {
31     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
32
33     public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
34     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
35     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
36     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000;
37     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
38     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
39     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
40     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
41     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
42     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
43     public static final boolean DEFAULT_PERSISTENT = true;
44     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
45     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
46     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
47     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
48     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
49     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
50     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
51     public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
52
53     private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
54
55     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
56     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
57     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
58     private String dataStoreMXBeanType;
59     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
60     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
61     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
62     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
63     private boolean persistent = DEFAULT_PERSISTENT;
64     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
65     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
66     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
67     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
68     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
69     private boolean writeOnlyTransactionOptimizationsEnabled = true;
70     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
71     private boolean transactionDebugContextEnabled = false;
72
73     public static Set<String> getGlobalDatastoreTypes() {
74         return globalDatastoreTypes;
75     }
76
77     private DatastoreContext() {
78         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
79         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
80         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
81         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
82         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
83         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
84         setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
85     }
86
87     private DatastoreContext(DatastoreContext other) {
88         this.dataStoreProperties = other.dataStoreProperties;
89         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
90         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
91         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
92         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
93         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
94         this.shardInitializationTimeout = other.shardInitializationTimeout;
95         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
96         this.persistent = other.persistent;
97         this.configurationReader = other.configurationReader;
98         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
99         this.dataStoreType = other.dataStoreType;
100         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
101         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
102         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
103         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
104
105         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
106         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
107         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
108         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
109         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
110         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
111         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
112         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
113         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
114     }
115
116     public static Builder newBuilder() {
117         return new Builder(new DatastoreContext());
118     }
119
120     public static Builder newBuilderFrom(DatastoreContext context) {
121         return new Builder(new DatastoreContext(context));
122     }
123
124     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
125         return dataStoreProperties;
126     }
127
128     public Duration getShardTransactionIdleTimeout() {
129         return shardTransactionIdleTimeout;
130     }
131
132     public String getDataStoreMXBeanType() {
133         return dataStoreMXBeanType;
134     }
135
136     public long getOperationTimeoutInMillis() {
137         return operationTimeoutInMillis;
138     }
139
140     public ConfigParams getShardRaftConfig() {
141         return raftConfig;
142     }
143
144     public int getShardTransactionCommitTimeoutInSeconds() {
145         return shardTransactionCommitTimeoutInSeconds;
146     }
147
148     public int getShardTransactionCommitQueueCapacity() {
149         return shardTransactionCommitQueueCapacity;
150     }
151
152     public Timeout getShardInitializationTimeout() {
153         return shardInitializationTimeout;
154     }
155
156     public Timeout getShardLeaderElectionTimeout() {
157         return shardLeaderElectionTimeout;
158     }
159
160     public boolean isPersistent() {
161         return persistent;
162     }
163
164     public AkkaConfigurationReader getConfigurationReader() {
165         return configurationReader;
166     }
167
168     public long getShardElectionTimeoutFactor(){
169         return raftConfig.getElectionTimeoutFactor();
170     }
171
172     public String getDataStoreType(){
173         return dataStoreType;
174     }
175
176     public long getTransactionCreationInitialRateLimit() {
177         return transactionCreationInitialRateLimit;
178     }
179
180     private void setPeerAddressResolver(PeerAddressResolver resolver) {
181         raftConfig.setPeerAddressResolver(resolver);
182     }
183
184     private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
185         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
186                 TimeUnit.MILLISECONDS));
187     }
188
189     private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){
190         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
191     }
192
193
194     private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
195         raftConfig.setIsolatedLeaderCheckInterval(
196                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
197     }
198
199     private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
200         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
201     }
202
203     private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) {
204         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
205     }
206
207     private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
208         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
209     }
210
211     private void setSnapshotBatchCount(long shardSnapshotBatchCount) {
212         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
213     }
214
215     private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) {
216         raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
217     }
218
219     public int getShardBatchedModificationCount() {
220         return shardBatchedModificationCount;
221     }
222
223     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
224         return writeOnlyTransactionOptimizationsEnabled;
225     }
226
227     public long getShardCommitQueueExpiryTimeoutInMillis() {
228         return shardCommitQueueExpiryTimeoutInMillis;
229     }
230
231     public boolean isTransactionDebugContextEnabled() {
232         return transactionDebugContextEnabled;
233     }
234
235     public int getShardSnapshotChunkSize() {
236         return raftConfig.getSnapshotChunkSize();
237     }
238
239     public static class Builder {
240         private final DatastoreContext datastoreContext;
241         private int maxShardDataChangeExecutorPoolSize =
242                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
243         private int maxShardDataChangeExecutorQueueSize =
244                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
245         private int maxShardDataChangeListenerQueueSize =
246                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
247         private int maxShardDataStoreExecutorQueueSize =
248                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
249
250         private Builder(DatastoreContext datastoreContext) {
251             this.datastoreContext = datastoreContext;
252
253             if(datastoreContext.getDataStoreProperties() != null) {
254                 maxShardDataChangeExecutorPoolSize =
255                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
256                 maxShardDataChangeExecutorQueueSize =
257                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
258                 maxShardDataChangeListenerQueueSize =
259                         datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
260                 maxShardDataStoreExecutorQueueSize =
261                         datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
262             }
263         }
264
265         public Builder boundedMailboxCapacity(int boundedMailboxCapacity) {
266             // TODO - this is defined in the yang DataStoreProperties but not currently used.
267             return this;
268         }
269
270         public Builder enableMetricCapture(boolean enableMetricCapture) {
271             // TODO - this is defined in the yang DataStoreProperties but not currently used.
272             return this;
273         }
274
275
276         public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) {
277             datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
278             return this;
279         }
280
281         public Builder shardTransactionIdleTimeoutInMinutes(long timeout) {
282             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
283         }
284
285         public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
286             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
287             return this;
288         }
289
290         public Builder operationTimeoutInMillis(long operationTimeoutInMillis) {
291             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
292             return this;
293         }
294
295         public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
296             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
297             return this;
298         }
299
300         public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
301             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
302             return this;
303         }
304
305         public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
306             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
307             return this;
308         }
309
310         public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
311             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
312             return this;
313         }
314
315         public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
316             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
317             return this;
318         }
319
320         public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
321             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
322             return this;
323         }
324
325         public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
326             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
327             return this;
328         }
329
330         public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
331             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
332             return this;
333         }
334
335         public Builder shardInitializationTimeoutInSeconds(long timeout) {
336             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
337         }
338
339         public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
340             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
341             return this;
342         }
343
344         public Builder shardLeaderElectionTimeoutInSeconds(long timeout) {
345             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
346         }
347
348         public Builder configurationReader(AkkaConfigurationReader configurationReader){
349             datastoreContext.configurationReader = configurationReader;
350             return this;
351         }
352
353         public Builder persistent(boolean persistent){
354             datastoreContext.persistent = persistent;
355             return this;
356         }
357
358         public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
359             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
360             return this;
361         }
362
363         public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){
364             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
365             return this;
366         }
367
368         public Builder transactionCreationInitialRateLimit(long initialRateLimit){
369             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
370             return this;
371         }
372
373         public Builder dataStoreType(String dataStoreType){
374             datastoreContext.dataStoreType = dataStoreType;
375             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore";
376             return this;
377         }
378
379         public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
380             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
381             return this;
382         }
383
384         public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
385             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
386             return this;
387         }
388
389         public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
390             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
391             return this;
392         }
393
394         public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
395             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
396                     value, TimeUnit.SECONDS);
397             return this;
398         }
399
400         public Builder transactionDebugContextEnabled(boolean value) {
401             datastoreContext.transactionDebugContextEnabled = value;
402             return this;
403         }
404
405         public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
406             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
407             return this;
408         }
409
410         public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) {
411             this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
412             return this;
413         }
414
415         public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) {
416             this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
417             return this;
418         }
419
420         public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) {
421             this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
422             return this;
423         }
424
425         public DatastoreContext build() {
426             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
427                     maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
428                     maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
429
430             if(datastoreContext.dataStoreType != null) {
431                 globalDatastoreTypes.add(datastoreContext.dataStoreType);
432             }
433
434             return datastoreContext;
435         }
436
437         public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) {
438             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
439             return this;
440         }
441
442         public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) {
443             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
444             return this;
445         }
446
447         public Builder shardPeerAddressResolver(PeerAddressResolver resolver) {
448             datastoreContext.setPeerAddressResolver(resolver);
449             return this;
450         }
451     }
452 }