X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FDefaultConfigParamsImpl.java;h=56fb63367207b94ad368214a569e2009f8c5648f;hb=bfe4439155b27fbf9ae300252420c8a81fcbdb80;hp=a2092234d54134fbe3de765d17041ad766d689c4;hpb=8cf40f4741c70a760dadb4300946c1dc88f95611;p=controller.git
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
index a2092234d5..56fb633672 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
@@ -7,23 +7,31 @@
*/
package org.opendaylight.controller.cluster.raft;
-import scala.concurrent.duration.FiniteDuration;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
/**
- * Default implementation of the ConfigParams
- *
- * If no implementation is provided for ConfigParams, then this will be used.
+ * Default implementation of the ConfigParams.
*/
public class DefaultConfigParamsImpl implements ConfigParams {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigParamsImpl.class);
+
private static final int SNAPSHOT_BATCH_COUNT = 20000;
private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
/**
- * The maximum election time variance
+ * The maximum election time variance.
*/
private static final int ELECTION_TIME_MAX_VARIANCE = 100;
@@ -32,35 +40,83 @@ public class DefaultConfigParamsImpl implements ConfigParams {
/**
* The interval at which a heart beat message will be sent to the remote
- * RaftActor
- *
+ * RaftActor.
+ *
+ *
* Since this is set to 100 milliseconds the Election timeout should be
* at least 200 milliseconds
*/
public static final FiniteDuration HEART_BEAT_INTERVAL =
new FiniteDuration(100, TimeUnit.MILLISECONDS);
-
private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
- private FiniteDuration isolatedLeaderCheckInterval =
- new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
+ private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
+ private FiniteDuration electionTimeOutInterval;
+
+ // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
+ // in-memory journal can use before it needs to snapshot
+ private int snapshotDataThresholdPercentage = 12;
+
+ private int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
+
+ private long electionTimeoutFactor = 2;
+ private String customRaftPolicyImplementationClass;
+
+ private final Supplier policySupplier = Suppliers.memoize(new PolicySupplier());
+
+ private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE;
+
+ private String tempFileDirectory = "";
+
+ private int fileBackedStreamingThreshold = 128 * MEGABYTE;
public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
+ electionTimeOutInterval = null;
}
public void setSnapshotBatchCount(long snapshotBatchCount) {
this.snapshotBatchCount = snapshotBatchCount;
}
+ public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage) {
+ this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
+ }
+
+ public void setSnapshotChunkSize(int snapshotChunkSize) {
+ this.snapshotChunkSize = snapshotChunkSize;
+ }
+
public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
}
public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
- this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval;
+ this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
+ }
+
+ public void setElectionTimeoutFactor(long electionTimeoutFactor) {
+ this.electionTimeoutFactor = electionTimeoutFactor;
+ electionTimeOutInterval = null;
+ }
+
+ public void setTempFileDirectory(String tempFileDirectory) {
+ this.tempFileDirectory = tempFileDirectory;
+ }
+
+ public void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+ this.fileBackedStreamingThreshold = fileBackedStreamingThreshold;
+ }
+
+ public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass) {
+ this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass;
+ }
+
+ @Override
+ public String getCustomRaftPolicyImplementationClass() {
+ return customRaftPolicyImplementationClass;
}
@Override
@@ -68,6 +124,12 @@ public class DefaultConfigParamsImpl implements ConfigParams {
return snapshotBatchCount;
}
+ @Override
+ public int getSnapshotDataThresholdPercentage() {
+ return snapshotDataThresholdPercentage;
+ }
+
+
@Override
public FiniteDuration getHeartBeatInterval() {
return heartBeatInterval;
@@ -75,8 +137,11 @@ public class DefaultConfigParamsImpl implements ConfigParams {
@Override
public FiniteDuration getElectionTimeOutInterval() {
- // returns 2 times the heart beat interval
- return getHeartBeatInterval().$times(2);
+ if (electionTimeOutInterval == null) {
+ electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor);
+ }
+
+ return electionTimeOutInterval;
}
@Override
@@ -86,7 +151,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
@Override
public int getSnapshotChunkSize() {
- return SNAPSHOT_CHUNK_SIZE;
+ return snapshotChunkSize;
}
@Override
@@ -95,7 +160,61 @@ public class DefaultConfigParamsImpl implements ConfigParams {
}
@Override
- public FiniteDuration getIsolatedCheckInterval() {
+ public long getIsolatedCheckIntervalInMillis() {
return isolatedLeaderCheckInterval;
}
+
+ @Override
+ public long getElectionTimeoutFactor() {
+ return electionTimeoutFactor;
+ }
+
+ @Override
+ public RaftPolicy getRaftPolicy() {
+ return policySupplier.get();
+ }
+
+ @Override
+ public String getTempFileDirectory() {
+ return tempFileDirectory;
+ }
+
+ @Override
+ public int getFileBackedStreamingThreshold() {
+ return fileBackedStreamingThreshold;
+ }
+
+ private class PolicySupplier implements Supplier {
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public RaftPolicy get() {
+ if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
+ LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
+ return DefaultRaftPolicy.INSTANCE;
+ }
+
+ try {
+ String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
+ LOG.info("Trying to use custom RaftPolicy {}", className);
+ return (RaftPolicy)Class.forName(className).newInstance();
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.error("Could not create custom raft policy, will stick with default", e);
+ } else {
+ LOG.error("Could not create custom raft policy, will stick with default : cause = {}",
+ e.getMessage());
+ }
+ }
+ return DefaultRaftPolicy.INSTANCE;
+ }
+ }
+
+ @Override
+ public PeerAddressResolver getPeerAddressResolver() {
+ return peerAddressResolver;
+ }
+
+ public void setPeerAddressResolver(@Nonnull PeerAddressResolver peerAddressResolver) {
+ this.peerAddressResolver = Preconditions.checkNotNull(peerAddressResolver);
+ }
}