Allow incremental recovery
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / DefaultConfigParamsImpl.java
index 46949da17e445d3e63345b47b1f7a74de1baf7c3..97838b03212f8ea8b3fbd6201995b398a9b61977 100644 (file)
@@ -7,12 +7,14 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
+import java.util.function.Supplier;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
@@ -20,20 +22,22 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
- * Default implementation of the ConfigParams
- *
- * If no implementation is provided for ConfigParams, then this will be used.
+ * Default implementation of the ConfigParams.
  */
 public class DefaultConfigParamsImpl implements ConfigParams {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigParamsImpl.class);
 
     private static final int SNAPSHOT_BATCH_COUNT = 20000;
+    /**
+     * Interval after which a snapshot should be taken during the recovery process. 0 if never.
+     */
+    private static final int RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
 
     private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
 
     /**
-     * The maximum election time variance
+     * The maximum election time variance.
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
@@ -42,17 +46,21 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     /**
      * The interval at which a heart beat message will be sent to the remote
-     * RaftActor
-     * <p/>
+     * RaftActor.
+     *
+     * <p>
      * Since this is set to 100 milliseconds the Election timeout should be
      * at least 200 milliseconds
      */
     public static final FiniteDuration HEART_BEAT_INTERVAL =
         new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
+    private final Supplier<RaftPolicy> policySupplier = Suppliers.memoize(this::getPolicy);
+
     private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+    private int recoverySnapshotIntervalSeconds = RECOVERY_SNAPSHOT_INTERVAL_SECONDS;
     private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
     private FiniteDuration electionTimeOutInterval;
 
@@ -63,43 +71,65 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
 
     private long electionTimeoutFactor = 2;
+    private long candidateElectionTimeoutDivisor = 1;
     private String customRaftPolicyImplementationClass;
 
-    private final Supplier<RaftPolicy> policySupplier = Suppliers.memoize(new PolicySupplier());
-
     private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE;
 
-    public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
+    private String tempFileDirectory = "";
+
+    private int fileBackedStreamingThreshold = 128 * MEGABYTE;
+
+    private long syncIndexThreshold = 10;
+
+    public void setHeartBeatInterval(final FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
         electionTimeOutInterval = null;
     }
 
-    public void setSnapshotBatchCount(long snapshotBatchCount) {
+    public void setSnapshotBatchCount(final long snapshotBatchCount) {
         this.snapshotBatchCount = snapshotBatchCount;
     }
 
-    public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage){
+    public void setRecoverySnapshotIntervalSeconds(int recoverySnapshotInterval) {
+        checkArgument(recoverySnapshotInterval >= 0);
+        this.recoverySnapshotIntervalSeconds = recoverySnapshotInterval;
+    }
+
+    public void setSnapshotDataThresholdPercentage(final int snapshotDataThresholdPercentage) {
         this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
     }
 
-    public void setSnapshotChunkSize(int snapshotChunkSize) {
+    public void setSnapshotChunkSize(final int snapshotChunkSize) {
         this.snapshotChunkSize = snapshotChunkSize;
     }
 
-    public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
+    public void setJournalRecoveryLogBatchSize(final int journalRecoveryLogBatchSize) {
         this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
     }
 
-    public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
+    public void setIsolatedLeaderCheckInterval(final FiniteDuration isolatedLeaderCheckInterval) {
         this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
     }
 
-    public void setElectionTimeoutFactor(long electionTimeoutFactor){
+    public void setElectionTimeoutFactor(final long electionTimeoutFactor) {
         this.electionTimeoutFactor = electionTimeoutFactor;
         electionTimeOutInterval = null;
     }
 
-    public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass){
+    public void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
+        this.candidateElectionTimeoutDivisor = candidateElectionTimeoutDivisor;
+    }
+
+    public void setTempFileDirectory(final String tempFileDirectory) {
+        this.tempFileDirectory = tempFileDirectory;
+    }
+
+    public void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
+        this.fileBackedStreamingThreshold = fileBackedStreamingThreshold;
+    }
+
+    public void setCustomRaftPolicyImplementationClass(final String customRaftPolicyImplementationClass) {
         this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass;
     }
 
@@ -118,6 +148,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         return snapshotDataThresholdPercentage;
     }
 
+    @Override
+    public int getRecoverySnapshotIntervalSeconds() {
+        return this.recoverySnapshotIntervalSeconds;
+    }
 
     @Override
     public FiniteDuration getHeartBeatInterval() {
@@ -126,13 +160,18 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     @Override
     public FiniteDuration getElectionTimeOutInterval() {
-        if(electionTimeOutInterval == null) {
+        if (electionTimeOutInterval == null) {
             electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor);
         }
 
         return electionTimeOutInterval;
     }
 
+    @Override
+    public long getCandidateElectionTimeoutDivisor() {
+        return candidateElectionTimeoutDivisor;
+    }
+
     @Override
     public int getElectionTimeVariance() {
         return ELECTION_TIME_MAX_VARIANCE;
@@ -163,36 +202,55 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         return policySupplier.get();
     }
 
-    private class PolicySupplier implements Supplier<RaftPolicy>{
-        @Override
-        public RaftPolicy get() {
-            if(Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)){
-                LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
-                return DefaultRaftPolicy.INSTANCE;
-            }
-            try {
-                String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
-                LOG.info("Trying to use custom RaftPolicy {}", className);
-                Class c = Class.forName(className);
-                RaftPolicy obj = (RaftPolicy)c.newInstance();
-                return obj;
-            } catch (Exception e) {
-                if(LOG.isDebugEnabled()) {
-                    LOG.error("Could not create custom raft policy, will stick with default", e);
-                } else {
-                    LOG.error("Could not create custom raft policy, will stick with default : cause = {}", e.getMessage());
-                }
-            }
-            return DefaultRaftPolicy.INSTANCE;
-        }
+    @Override
+    public String getTempFileDirectory() {
+        return tempFileDirectory;
     }
 
+    @Override
+    public int getFileBackedStreamingThreshold() {
+        return fileBackedStreamingThreshold;
+    }
+
+
     @Override
     public PeerAddressResolver getPeerAddressResolver() {
         return peerAddressResolver;
     }
 
-    public void setPeerAddressResolver(@Nonnull PeerAddressResolver peerAddressResolver) {
-        this.peerAddressResolver = Preconditions.checkNotNull(peerAddressResolver);
+    public void setPeerAddressResolver(final @NonNull PeerAddressResolver peerAddressResolver) {
+        this.peerAddressResolver = requireNonNull(peerAddressResolver);
+    }
+
+    @Override
+    public long getSyncIndexThreshold() {
+        return syncIndexThreshold;
+    }
+
+    public void setSyncIndexThreshold(final long syncIndexThreshold) {
+        checkArgument(syncIndexThreshold >= 0);
+        this.syncIndexThreshold = syncIndexThreshold;
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private RaftPolicy getPolicy() {
+        if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
+            LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
+            return DefaultRaftPolicy.INSTANCE;
+        }
+
+        try {
+            String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
+            LOG.info("Trying to use custom RaftPolicy {}", className);
+            return (RaftPolicy)Class.forName(className).getDeclaredConstructor().newInstance();
+        } catch (ClassCastException | ReflectiveOperationException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.error("Could not create custom raft policy, will stick with default", e);
+            } else {
+                LOG.error("Could not create custom raft policy, will stick with default : cause = {}",
+                    e.getMessage());
+            }
+        }
+        return DefaultRaftPolicy.INSTANCE;
     }
 }