Add optional lz4 compression for snapshots
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DatastoreContext.java
index dee8142bbc0904e6666784143d8f1cb795f27baa..69131a0d1c19a238e0ba57df3786bad3b93caa65 100644 (file)
@@ -5,16 +5,17 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.text.WordUtils;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
@@ -22,12 +23,10 @@ import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationRea
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -35,30 +34,37 @@ import scala.concurrent.duration.FiniteDuration;
  *
  * @author Thomas Pantelis
  */
+// Non-final for mocking
 public class DatastoreContext implements ClientActorConfig {
     public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
 
-    public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
+    public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10,
+        TimeUnit.MINUTES);
     public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000;
     public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
     public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
+    public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
             DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
     public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
+    public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
     public static final boolean DEFAULT_PERSISTENT = true;
+    public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
+    public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
+    public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
 
     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
 
@@ -68,20 +74,21 @@ public class DatastoreContext implements ClientActorConfig {
 
     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
 
-    private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
-    private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+    private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
     private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS;
     private String dataStoreMXBeanType;
     private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
     private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
     private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+    private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
     private boolean persistent = DEFAULT_PERSISTENT;
+    private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE;
     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
-    private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
+    private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty();
     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
     private boolean writeOnlyTransactionOptimizationsEnabled = true;
     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
@@ -92,24 +99,27 @@ public class DatastoreContext implements ClientActorConfig {
     private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
+    private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
+    private boolean useLz4Compression = false;
 
     public static Set<String> getGlobalDatastoreNames() {
         return GLOBAL_DATASTORE_NAMES;
     }
 
-    private DatastoreContext() {
+    DatastoreContext() {
         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
+        setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS);
         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
+        setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
         setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
     }
 
     private DatastoreContext(final DatastoreContext other) {
-        this.dataStoreProperties = other.dataStoreProperties;
         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
         this.dataStoreMXBeanType = other.dataStoreMXBeanType;
@@ -117,7 +127,9 @@ public class DatastoreContext implements ClientActorConfig {
         this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
         this.shardInitializationTimeout = other.shardInitializationTimeout;
         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
+        this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
         this.persistent = other.persistent;
+        this.snapshotOnRootOverwrite = other.snapshotOnRootOverwrite;
         this.configurationReader = other.configurationReader;
         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
         this.dataStoreName = other.dataStoreName;
@@ -132,13 +144,17 @@ public class DatastoreContext implements ClientActorConfig {
         this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
         this.requestTimeout = other.requestTimeout;
         this.noProgressTimeout = other.noProgressTimeout;
+        this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
+        this.useLz4Compression = other.useLz4Compression;
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
+        setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds());
         setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
         setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
+        setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
         setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
@@ -156,11 +172,7 @@ public class DatastoreContext implements ClientActorConfig {
         return new Builder(new DatastoreContext(context));
     }
 
-    public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
-        return dataStoreProperties;
-    }
-
-    public Duration getShardTransactionIdleTimeout() {
+    public FiniteDuration getShardTransactionIdleTimeout() {
         return shardTransactionIdleTimeout;
     }
 
@@ -192,10 +204,24 @@ public class DatastoreContext implements ClientActorConfig {
         return shardLeaderElectionTimeout;
     }
 
+    /**
+     * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
+     * on the local node to settle.
+     *
+     * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
+     */
+    public int getInitialSettleTimeoutMultiplier() {
+        return initialSettleTimeoutMultiplier;
+    }
+
     public boolean isPersistent() {
         return persistent;
     }
 
+    public boolean isSnapshotOnRootOverwrite() {
+        return this.snapshotOnRootOverwrite;
+    }
+
     public AkkaConfigurationReader getConfigurationReader() {
         return configurationReader;
     }
@@ -265,13 +291,16 @@ public class DatastoreContext implements ClientActorConfig {
         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
     }
 
+    private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
+        raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
+    }
+
     private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
     }
 
     private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
-        Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
-                && shardSnapshotDataThresholdPercentage <= 100);
+        checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100);
         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
     }
 
@@ -279,6 +308,14 @@ public class DatastoreContext implements ClientActorConfig {
         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
+    /**
+     * Set the interval in seconds after which a snapshot should be taken during the recovery process.
+     * 0 means don't take snapshots
+     */
+    private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
+        raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
+    }
+
     @Deprecated
     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
         // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
@@ -317,6 +354,10 @@ public class DatastoreContext implements ClientActorConfig {
         return useTellBasedProtocol;
     }
 
+    public boolean isUseLz4Compression() {
+        return useLz4Compression;
+    }
+
     @Override
     public int getMaximumMessageSliceSize() {
         return maximumMessageSliceSize;
@@ -337,30 +378,15 @@ public class DatastoreContext implements ClientActorConfig {
         return noProgressTimeout;
     }
 
+    public int getInitialPayloadSerializedBufferCapacity() {
+        return initialPayloadSerializedBufferCapacity;
+    }
+
     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
         private final DatastoreContext datastoreContext;
-        private int maxShardDataChangeExecutorPoolSize =
-                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
-        private int maxShardDataChangeExecutorQueueSize =
-                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
-        private int maxShardDataChangeListenerQueueSize =
-                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
-        private int maxShardDataStoreExecutorQueueSize =
-                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
-
-        private Builder(final DatastoreContext datastoreContext) {
-            this.datastoreContext = datastoreContext;
 
-            if (datastoreContext.getDataStoreProperties() != null) {
-                maxShardDataChangeExecutorPoolSize =
-                        datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
-                maxShardDataChangeExecutorQueueSize =
-                        datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
-                maxShardDataChangeListenerQueueSize =
-                        datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
-                maxShardDataStoreExecutorQueueSize =
-                        datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
-            }
+        Builder(final DatastoreContext datastoreContext) {
+            this.datastoreContext = datastoreContext;
         }
 
         public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
@@ -375,7 +401,7 @@ public class DatastoreContext implements ClientActorConfig {
 
 
         public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
-            datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
+            datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit);
             return this;
         }
 
@@ -413,6 +439,12 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
+        public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) {
+            checkArgument(recoverySnapshotIntervalSeconds >= 0);
+            datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds);
+            return this;
+        }
+
         public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
             return this;
@@ -442,6 +474,12 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
+        public Builder initialSettleTimeoutMultiplier(final int multiplier) {
+            checkArgument(multiplier >= 0);
+            datastoreContext.initialSettleTimeoutMultiplier = multiplier;
+            return this;
+        }
+
         public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
         }
@@ -456,6 +494,11 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
+        public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) {
+            datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite;
+            return this;
+        }
+
         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
             return this;
@@ -466,13 +509,18 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
+        public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
+            datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor);
+            return this;
+        }
+
         public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
             return this;
         }
 
         public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
-            datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
+            datastoreContext.logicalStoreType = requireNonNull(logicalStoreType);
 
             // Retain compatible naming
             switch (logicalStoreType) {
@@ -495,7 +543,7 @@ public class DatastoreContext implements ClientActorConfig {
         }
 
         public Builder dataStoreName(final String dataStoreName) {
-            datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
+            datastoreContext.dataStoreName = requireNonNull(dataStoreName);
             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
             return this;
         }
@@ -526,23 +574,23 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
-        public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) {
-            this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
+        @Deprecated(forRemoval = true)
+        public Builder maxShardDataChangeExecutorPoolSize(final int newMaxShardDataChangeExecutorPoolSize) {
             return this;
         }
 
-        public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) {
-            this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
+        @Deprecated(forRemoval = true)
+        public Builder maxShardDataChangeExecutorQueueSize(final int newMaxShardDataChangeExecutorQueueSize) {
             return this;
         }
 
-        public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) {
-            this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
+        @Deprecated(forRemoval = true)
+        public Builder maxShardDataChangeListenerQueueSize(final int newMaxShardDataChangeListenerQueueSize) {
             return this;
         }
 
-        public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) {
-            this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
+        @Deprecated(forRemoval = true)
+        public Builder maxShardDataStoreExecutorQueueSize(final int newMaxShardDataStoreExecutorQueueSize) {
             return this;
         }
 
@@ -551,6 +599,11 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
+        public Builder useLz4Compression(final boolean value) {
+            datastoreContext.useLz4Compression = value;
+            return this;
+        }
+
         /**
          * For unit tests only.
          */
@@ -613,12 +666,13 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
+        public Builder initialPayloadSerializedBufferCapacity(final int capacity) {
+            datastoreContext.initialPayloadSerializedBufferCapacity = capacity;
+            return this;
+        }
+
         @Override
         public DatastoreContext build() {
-            datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
-                    maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
-                    maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
-
             if (datastoreContext.dataStoreName != null) {
                 GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
             }