Bug 7521: Add FileBackedOutputStream and use for snapshot chunking
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / DefaultConfigParamsImpl.java
index a2092234d54134fbe3de765d17041ad766d689c4..56fb63367207b94ad368214a569e2009f8c5648f 100644 (file)
@@ -7,23 +7,31 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import scala.concurrent.duration.FiniteDuration;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
- * Default implementation of the ConfigParams
- *
- * If no implementation is provided for ConfigParams, then this will be used.
+ * Default implementation of the ConfigParams.
  */
 public class DefaultConfigParamsImpl implements ConfigParams {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigParamsImpl.class);
+
     private static final int SNAPSHOT_BATCH_COUNT = 20000;
 
     private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
 
     /**
-     * The maximum election time variance
+     * The maximum election time variance.
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
@@ -32,35 +40,83 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     /**
      * The interval at which a heart beat message will be sent to the remote
-     * RaftActor
-     * <p/>
+     * RaftActor.
+     *
+     * <p>
      * Since this is set to 100 milliseconds the Election timeout should be
      * at least 200 milliseconds
      */
     public static final FiniteDuration HEART_BEAT_INTERVAL =
         new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
-
     private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
-    private FiniteDuration isolatedLeaderCheckInterval =
-        new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
+    private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
+    private FiniteDuration electionTimeOutInterval;
+
+    // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
+    // in-memory journal can use before it needs to snapshot
+    private int snapshotDataThresholdPercentage = 12;
+
+    private int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
+
+    private long electionTimeoutFactor = 2;
+    private String customRaftPolicyImplementationClass;
+
+    private final Supplier<RaftPolicy> policySupplier = Suppliers.memoize(new PolicySupplier());
+
+    private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE;
+
+    private String tempFileDirectory = "";
+
+    private int fileBackedStreamingThreshold = 128 * MEGABYTE;
 
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
+        electionTimeOutInterval = null;
     }
 
     public void setSnapshotBatchCount(long snapshotBatchCount) {
         this.snapshotBatchCount = snapshotBatchCount;
     }
 
+    public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage) {
+        this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
+    }
+
+    public void setSnapshotChunkSize(int snapshotChunkSize) {
+        this.snapshotChunkSize = snapshotChunkSize;
+    }
+
     public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
         this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
     }
 
     public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
-        this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval;
+        this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
+    }
+
+    public void setElectionTimeoutFactor(long electionTimeoutFactor) {
+        this.electionTimeoutFactor = electionTimeoutFactor;
+        electionTimeOutInterval = null;
+    }
+
+    public void setTempFileDirectory(String tempFileDirectory) {
+        this.tempFileDirectory = tempFileDirectory;
+    }
+
+    public void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+        this.fileBackedStreamingThreshold = fileBackedStreamingThreshold;
+    }
+
+    public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass) {
+        this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass;
+    }
+
+    @Override
+    public String getCustomRaftPolicyImplementationClass() {
+        return customRaftPolicyImplementationClass;
     }
 
     @Override
@@ -68,6 +124,12 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         return snapshotBatchCount;
     }
 
+    @Override
+    public int getSnapshotDataThresholdPercentage() {
+        return snapshotDataThresholdPercentage;
+    }
+
+
     @Override
     public FiniteDuration getHeartBeatInterval() {
         return heartBeatInterval;
@@ -75,8 +137,11 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     @Override
     public FiniteDuration getElectionTimeOutInterval() {
-        // returns 2 times the heart beat interval
-        return getHeartBeatInterval().$times(2);
+        if (electionTimeOutInterval == null) {
+            electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor);
+        }
+
+        return electionTimeOutInterval;
     }
 
     @Override
@@ -86,7 +151,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
 
     @Override
     public int getSnapshotChunkSize() {
-        return SNAPSHOT_CHUNK_SIZE;
+        return snapshotChunkSize;
     }
 
     @Override
@@ -95,7 +160,61 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     }
 
     @Override
-    public FiniteDuration getIsolatedCheckInterval() {
+    public long getIsolatedCheckIntervalInMillis() {
         return isolatedLeaderCheckInterval;
     }
+
+    @Override
+    public long getElectionTimeoutFactor() {
+        return electionTimeoutFactor;
+    }
+
+    @Override
+    public RaftPolicy getRaftPolicy() {
+        return policySupplier.get();
+    }
+
+    @Override
+    public String getTempFileDirectory() {
+        return tempFileDirectory;
+    }
+
+    @Override
+    public int getFileBackedStreamingThreshold() {
+        return fileBackedStreamingThreshold;
+    }
+
+    private class PolicySupplier implements Supplier<RaftPolicy> {
+        @Override
+        @SuppressWarnings("checkstyle:IllegalCatch")
+        public RaftPolicy get() {
+            if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
+                LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
+                return DefaultRaftPolicy.INSTANCE;
+            }
+
+            try {
+                String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
+                LOG.info("Trying to use custom RaftPolicy {}", className);
+                return (RaftPolicy)Class.forName(className).newInstance();
+            } catch (Exception e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.error("Could not create custom raft policy, will stick with default", e);
+                } else {
+                    LOG.error("Could not create custom raft policy, will stick with default : cause = {}",
+                            e.getMessage());
+                }
+            }
+            return DefaultRaftPolicy.INSTANCE;
+        }
+    }
+
+    @Override
+    public PeerAddressResolver getPeerAddressResolver() {
+        return peerAddressResolver;
+    }
+
+    public void setPeerAddressResolver(@Nonnull PeerAddressResolver peerAddressResolver) {
+        this.peerAddressResolver = Preconditions.checkNotNull(peerAddressResolver);
+    }
 }