BUG-5280: do not cache modify responses
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.util.Timeout;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Sets;
15 import java.util.Set;
16 import java.util.concurrent.TimeUnit;
17 import org.apache.commons.lang3.text.WordUtils;
18 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
19 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
20 import org.opendaylight.controller.cluster.raft.ConfigParams;
21 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
22 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
25 import scala.concurrent.duration.Duration;
26 import scala.concurrent.duration.FiniteDuration;
27
28 /**
29  * Contains contextual data for a data store.
30  *
31  * @author Thomas Pantelis
32  */
33 public class DatastoreContext {
34     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
35
36     public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
37     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
38     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
39     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
40     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
41     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
42     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
43             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
44     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
45     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
46     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
47     public static final boolean DEFAULT_PERSISTENT = true;
48     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
49     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
50     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
51     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
52     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
53     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
54     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
55             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
56     public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
57
58     private static final Set<String> GLOBAL_DATASTORE_NAMES = Sets.newConcurrentHashSet();
59
60     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
61     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
62     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
63     private String dataStoreMXBeanType;
64     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
65     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
66     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
67     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
68     private boolean persistent = DEFAULT_PERSISTENT;
69     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
70     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
71     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
72     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
73     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
74     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
75     private boolean writeOnlyTransactionOptimizationsEnabled = true;
76     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
77     private boolean transactionDebugContextEnabled = false;
78     private String shardManagerPersistenceId;
79
80     public static Set<String> getGlobalDatastoreNames() {
81         return GLOBAL_DATASTORE_NAMES;
82     }
83
84     private DatastoreContext() {
85         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
86         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
87         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
88         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
89         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
90         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
91         setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
92     }
93
94     private DatastoreContext(DatastoreContext other) {
95         this.dataStoreProperties = other.dataStoreProperties;
96         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
97         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
98         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
99         this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
100         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
101         this.shardInitializationTimeout = other.shardInitializationTimeout;
102         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
103         this.persistent = other.persistent;
104         this.configurationReader = other.configurationReader;
105         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
106         this.dataStoreName = other.dataStoreName;
107         this.logicalStoreType = other.logicalStoreType;
108         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
109         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
110         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
111         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
112         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
113
114         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
115         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
116         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
117         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
118         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
119         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
120         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
121         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
122         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
123     }
124
125     public static Builder newBuilder() {
126         return new Builder(new DatastoreContext());
127     }
128
129     public static Builder newBuilderFrom(DatastoreContext context) {
130         return new Builder(new DatastoreContext(context));
131     }
132
133     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
134         return dataStoreProperties;
135     }
136
137     public Duration getShardTransactionIdleTimeout() {
138         return shardTransactionIdleTimeout;
139     }
140
141     public String getDataStoreMXBeanType() {
142         return dataStoreMXBeanType;
143     }
144
145     public long getOperationTimeoutInMillis() {
146         return operationTimeoutInMillis;
147     }
148
149     public ConfigParams getShardRaftConfig() {
150         return raftConfig;
151     }
152
153     public int getShardTransactionCommitTimeoutInSeconds() {
154         return shardTransactionCommitTimeoutInSeconds;
155     }
156
157     public int getShardTransactionCommitQueueCapacity() {
158         return shardTransactionCommitQueueCapacity;
159     }
160
161     public Timeout getShardInitializationTimeout() {
162         return shardInitializationTimeout;
163     }
164
165     public Timeout getShardLeaderElectionTimeout() {
166         return shardLeaderElectionTimeout;
167     }
168
169     public boolean isPersistent() {
170         return persistent;
171     }
172
173     public AkkaConfigurationReader getConfigurationReader() {
174         return configurationReader;
175     }
176
177     public long getShardElectionTimeoutFactor() {
178         return raftConfig.getElectionTimeoutFactor();
179     }
180
181     public String getDataStoreName() {
182         return dataStoreName;
183     }
184
185     public LogicalDatastoreType getLogicalStoreType() {
186         return logicalStoreType;
187     }
188
189     public long getTransactionCreationInitialRateLimit() {
190         return transactionCreationInitialRateLimit;
191     }
192
193     public String getShardManagerPersistenceId() {
194         return shardManagerPersistenceId;
195     }
196
197     private void setPeerAddressResolver(PeerAddressResolver resolver) {
198         raftConfig.setPeerAddressResolver(resolver);
199     }
200
201     private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis) {
202         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
203                 TimeUnit.MILLISECONDS));
204     }
205
206     private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
207         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
208     }
209
210
211     private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
212         raftConfig.setIsolatedLeaderCheckInterval(
213                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
214     }
215
216     private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
217         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
218     }
219
220     private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) {
221         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
222     }
223
224     private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
225         Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
226                 && shardSnapshotDataThresholdPercentage <= 100);
227         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
228     }
229
230     private void setSnapshotBatchCount(long shardSnapshotBatchCount) {
231         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
232     }
233
234     private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) {
235         raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
236     }
237
238     public int getShardBatchedModificationCount() {
239         return shardBatchedModificationCount;
240     }
241
242     public boolean isWriteOnlyTransactionOptimizationsEnabled() {
243         return writeOnlyTransactionOptimizationsEnabled;
244     }
245
246     public long getShardCommitQueueExpiryTimeoutInMillis() {
247         return shardCommitQueueExpiryTimeoutInMillis;
248     }
249
250     public boolean isTransactionDebugContextEnabled() {
251         return transactionDebugContextEnabled;
252     }
253
254     public int getShardSnapshotChunkSize() {
255         return raftConfig.getSnapshotChunkSize();
256     }
257
258     public static class Builder {
259         private final DatastoreContext datastoreContext;
260         private int maxShardDataChangeExecutorPoolSize =
261                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
262         private int maxShardDataChangeExecutorQueueSize =
263                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
264         private int maxShardDataChangeListenerQueueSize =
265                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
266         private int maxShardDataStoreExecutorQueueSize =
267                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
268
269         private Builder(DatastoreContext datastoreContext) {
270             this.datastoreContext = datastoreContext;
271
272             if (datastoreContext.getDataStoreProperties() != null) {
273                 maxShardDataChangeExecutorPoolSize =
274                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
275                 maxShardDataChangeExecutorQueueSize =
276                         datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
277                 maxShardDataChangeListenerQueueSize =
278                         datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
279                 maxShardDataStoreExecutorQueueSize =
280                         datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
281             }
282         }
283
284         public Builder boundedMailboxCapacity(int boundedMailboxCapacity) {
285             // TODO - this is defined in the yang DataStoreProperties but not currently used.
286             return this;
287         }
288
289         public Builder enableMetricCapture(boolean enableMetricCapture) {
290             // TODO - this is defined in the yang DataStoreProperties but not currently used.
291             return this;
292         }
293
294
295         public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) {
296             datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
297             return this;
298         }
299
300         public Builder shardTransactionIdleTimeoutInMinutes(long timeout) {
301             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
302         }
303
304         public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
305             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
306             return this;
307         }
308
309         public Builder operationTimeoutInMillis(long operationTimeoutInMillis) {
310             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
311             return this;
312         }
313
314         public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
315             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
316             return this;
317         }
318
319         public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
320             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
321             return this;
322         }
323
324         public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
325             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
326             return this;
327         }
328
329         public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
330             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
331             return this;
332         }
333
334         public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
335             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
336             return this;
337         }
338
339         public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
340             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
341             return this;
342         }
343
344         public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
345             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
346             return this;
347         }
348
349         public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
350             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
351             return this;
352         }
353
354         public Builder shardInitializationTimeoutInSeconds(long timeout) {
355             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
356         }
357
358         public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
359             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
360             return this;
361         }
362
363         public Builder shardLeaderElectionTimeoutInSeconds(long timeout) {
364             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
365         }
366
367         public Builder configurationReader(AkkaConfigurationReader configurationReader) {
368             datastoreContext.configurationReader = configurationReader;
369             return this;
370         }
371
372         public Builder persistent(boolean persistent) {
373             datastoreContext.persistent = persistent;
374             return this;
375         }
376
377         public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
378             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
379             return this;
380         }
381
382         public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor) {
383             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
384             return this;
385         }
386
387         public Builder transactionCreationInitialRateLimit(long initialRateLimit) {
388             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
389             return this;
390         }
391
392         public Builder logicalStoreType(LogicalDatastoreType logicalStoreType) {
393             datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
394
395             // Retain compatible naming
396             switch (logicalStoreType) {
397                 case CONFIGURATION:
398                     dataStoreName("config");
399                     break;
400                 case OPERATIONAL:
401                     dataStoreName("operational");
402                     break;
403                 default:
404                     dataStoreName(logicalStoreType.name());
405             }
406
407             return this;
408         }
409
410         public Builder dataStoreName(String dataStoreName) {
411             datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
412             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
413             return this;
414         }
415
416         public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
417             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
418             return this;
419         }
420
421         public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
422             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
423             return this;
424         }
425
426         public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
427             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
428             return this;
429         }
430
431         public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
432             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
433                     value, TimeUnit.SECONDS);
434             return this;
435         }
436
437         public Builder transactionDebugContextEnabled(boolean value) {
438             datastoreContext.transactionDebugContextEnabled = value;
439             return this;
440         }
441
442         public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
443             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
444             return this;
445         }
446
447         public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) {
448             this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
449             return this;
450         }
451
452         public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) {
453             this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
454             return this;
455         }
456
457         public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) {
458             this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
459             return this;
460         }
461
462         /**
463          * For unit tests only.
464          */
465         @VisibleForTesting
466         public Builder shardManagerPersistenceId(String id) {
467             datastoreContext.shardManagerPersistenceId = id;
468             return this;
469         }
470
471         public DatastoreContext build() {
472             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
473                     maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
474                     maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
475
476             if (datastoreContext.dataStoreName != null) {
477                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
478             }
479
480             return datastoreContext;
481         }
482
483         public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) {
484             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
485             return this;
486         }
487
488         public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) {
489             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
490             return this;
491         }
492
493         public Builder shardPeerAddressResolver(PeerAddressResolver resolver) {
494             datastoreContext.setPeerAddressResolver(resolver);
495             return this;
496         }
497     }
498 }