BUG-8618: make sync threshold tuneable
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
index eeb6ad3155e2bc91d9e910728d589cca2084603b..02f2768fbb2a74f3fce79994800dbfb88da774d9 100644 (file)
@@ -11,8 +11,8 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.text.WordUtils;
 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
@@ -56,7 +56,11 @@ public class DatastoreContext {
             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
     public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
 
-    private static final Set<String> GLOBAL_DATASTORE_NAMES = Sets.newConcurrentHashSet();
+    public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
+
+    private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
+
+    private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
 
     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
@@ -69,7 +73,6 @@ public class DatastoreContext {
     private boolean persistent = DEFAULT_PERSISTENT;
     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
-    private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
@@ -92,6 +95,7 @@ public class DatastoreContext {
         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
         setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
+        setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
     }
 
     private DatastoreContext(final DatastoreContext other) {
@@ -127,13 +131,14 @@ public class DatastoreContext {
         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
         setTempFileDirectory(other.getTempFileDirectory());
         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
+        setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
     }
 
     public static Builder newBuilder() {
         return new Builder(new DatastoreContext());
     }
 
-    public static Builder newBuilderFrom(DatastoreContext context) {
+    public static Builder newBuilderFrom(final DatastoreContext context) {
         return new Builder(new DatastoreContext(context));
     }
 
@@ -209,7 +214,7 @@ public class DatastoreContext {
         return raftConfig.getTempFileDirectory();
     }
 
-    private void setTempFileDirectory(String tempFileDirectory) {
+    private void setTempFileDirectory(final String tempFileDirectory) {
         raftConfig.setTempFileDirectory(tempFileDirectory);
     }
 
@@ -217,51 +222,55 @@ public class DatastoreContext {
         return raftConfig.getFileBackedStreamingThreshold();
     }
 
-    private void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+    private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
     }
 
-    private void setPeerAddressResolver(PeerAddressResolver resolver) {
+    private void setPeerAddressResolver(final PeerAddressResolver resolver) {
         raftConfig.setPeerAddressResolver(resolver);
     }
 
-    private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis) {
+    private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
                 TimeUnit.MILLISECONDS));
     }
 
-    private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
+    private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
     }
 
 
-    private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
+    private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
         raftConfig.setIsolatedLeaderCheckInterval(
                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
     }
 
-    private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+    private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
     }
 
-    private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) {
+    private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
     }
 
-    private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+    private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
         Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
                 && shardSnapshotDataThresholdPercentage <= 100);
         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
     }
 
-    private void setSnapshotBatchCount(long shardSnapshotBatchCount) {
+    private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
-    private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) {
+    private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
         raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
     }
 
+    private void setSyncIndexThreshold(final long syncIndexThreshold) {
+        raftConfig.setSyncIndexThreshold(syncIndexThreshold);
+    }
+
     public int getShardBatchedModificationCount() {
         return shardBatchedModificationCount;
     }
@@ -286,7 +295,7 @@ public class DatastoreContext {
         return raftConfig.getSnapshotChunkSize();
     }
 
-    public static class Builder {
+    public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
         private final DatastoreContext datastoreContext;
         private int maxShardDataChangeExecutorPoolSize =
                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
@@ -297,7 +306,7 @@ public class DatastoreContext {
         private int maxShardDataStoreExecutorQueueSize =
                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
 
-        private Builder(DatastoreContext datastoreContext) {
+        private Builder(final DatastoreContext datastoreContext) {
             this.datastoreContext = datastoreContext;
 
             if (datastoreContext.getDataStoreProperties() != null) {
@@ -312,115 +321,115 @@ public class DatastoreContext {
             }
         }
 
-        public Builder boundedMailboxCapacity(int boundedMailboxCapacity) {
+        public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
             // TODO - this is defined in the yang DataStoreProperties but not currently used.
             return this;
         }
 
-        public Builder enableMetricCapture(boolean enableMetricCapture) {
+        public Builder enableMetricCapture(final boolean enableMetricCapture) {
             // TODO - this is defined in the yang DataStoreProperties but not currently used.
             return this;
         }
 
 
-        public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) {
+        public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
             datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
             return this;
         }
 
-        public Builder shardTransactionIdleTimeoutInMinutes(long timeout) {
+        public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
         }
 
-        public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
+        public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
             return this;
         }
 
-        public Builder operationTimeoutInMillis(long operationTimeoutInMillis) {
+        public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
             return this;
         }
 
-        public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
+        public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
             return this;
         }
 
-        public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
+        public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
             return this;
         }
 
-        public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
+        public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
             return this;
         }
 
-        public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
+        public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
             return this;
         }
 
-        public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+        public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
             return this;
         }
 
-        public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
+        public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
             return this;
         }
 
-        public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
+        public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
             return this;
         }
 
-        public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
+        public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
             return this;
         }
 
-        public Builder shardInitializationTimeoutInSeconds(long timeout) {
+        public Builder shardInitializationTimeoutInSeconds(final long timeout) {
             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
         }
 
-        public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
+        public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
             return this;
         }
 
-        public Builder shardLeaderElectionTimeoutInSeconds(long timeout) {
+        public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
         }
 
-        public Builder configurationReader(AkkaConfigurationReader configurationReader) {
+        public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
             datastoreContext.configurationReader = configurationReader;
             return this;
         }
 
-        public Builder persistent(boolean persistent) {
+        public Builder persistent(final boolean persistent) {
             datastoreContext.persistent = persistent;
             return this;
         }
 
-        public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
+        public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
             return this;
         }
 
-        public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+        public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
             return this;
         }
 
-        public Builder transactionCreationInitialRateLimit(long initialRateLimit) {
+        public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
             return this;
         }
 
-        public Builder logicalStoreType(LogicalDatastoreType logicalStoreType) {
+        public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
             datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
 
             // Retain compatible naming
@@ -443,7 +452,7 @@ public class DatastoreContext {
             return this;
         }
 
-        public Builder dataStoreName(String dataStoreName) {
+        public Builder dataStoreName(final String dataStoreName) {
             datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
             return this;
@@ -454,48 +463,48 @@ public class DatastoreContext {
             return this;
         }
 
-        public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
+        public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
             return this;
         }
 
-        public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
+        public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
             return this;
         }
 
-        public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
+        public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
                     value, TimeUnit.SECONDS);
             return this;
         }
 
-        public Builder transactionDebugContextEnabled(boolean value) {
+        public Builder transactionDebugContextEnabled(final boolean value) {
             datastoreContext.transactionDebugContextEnabled = value;
             return this;
         }
 
-        public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
+        public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) {
             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
             return this;
         }
 
-        public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) {
+        public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) {
             this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
             return this;
         }
 
-        public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) {
+        public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) {
             this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
             return this;
         }
 
-        public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) {
+        public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) {
             this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
             return this;
         }
 
-        public Builder useTellBasedProtocol(boolean value) {
+        public Builder useTellBasedProtocol(final boolean value) {
             datastoreContext.useTellBasedProtocol = value;
             return this;
         }
@@ -504,46 +513,52 @@ public class DatastoreContext {
          * For unit tests only.
          */
         @VisibleForTesting
-        public Builder shardManagerPersistenceId(String id) {
+        public Builder shardManagerPersistenceId(final String id) {
             datastoreContext.shardManagerPersistenceId = id;
             return this;
         }
 
-        public DatastoreContext build() {
-            datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
-                    maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
-                    maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
-
-            if (datastoreContext.dataStoreName != null) {
-                GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
-            }
-
-            return datastoreContext;
-        }
-
-        public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) {
+        public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
             return this;
         }
 
-        public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) {
+        public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
             return this;
         }
 
-        public Builder shardPeerAddressResolver(PeerAddressResolver resolver) {
+        public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
             datastoreContext.setPeerAddressResolver(resolver);
             return this;
         }
 
-        public Builder tempFileDirectory(String tempFileDirectory) {
+        public Builder tempFileDirectory(final String tempFileDirectory) {
             datastoreContext.setTempFileDirectory(tempFileDirectory);
             return this;
         }
 
-        public Builder fileBackedStreamingThresholdInMegabytes(int  fileBackedStreamingThreshold) {
+        public Builder fileBackedStreamingThresholdInMegabytes(final int  fileBackedStreamingThreshold) {
             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
             return this;
         }
+
+        public Builder syncIndexThreshold(final long syncIndexThreshold) {
+            datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
+            return this;
+        }
+
+        @Override
+        public DatastoreContext build() {
+            datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
+                    maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
+                    maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
+
+            if (datastoreContext.dataStoreName != null) {
+                GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
+            }
+
+            return datastoreContext;
+        }
     }
 }