*/
package org.opendaylight.controller.cluster.raft;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Strings;
+import com.google.common.base.Suppliers;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
/**
- * Default implementation of the ConfigParams
- *
- * If no implementation is provided for ConfigParams, then this will be used.
+ * Default implementation of the ConfigParams.
*/
public class DefaultConfigParamsImpl implements ConfigParams {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigParamsImpl.class);
+
private static final int SNAPSHOT_BATCH_COUNT = 20000;
+ /**
+ * Interval after which a snapshot should be taken during the recovery process. 0 if never.
+ */
+ private static final int RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0;
private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
/**
- * The maximum election time variance
+ * The maximum election time variance.
*/
private static final int ELECTION_TIME_MAX_VARIANCE = 100;
- private static final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+ private static final int SNAPSHOT_CHUNK_SIZE = 480 * 1024; // 480KiB
/**
* The interval at which a heart beat message will be sent to the remote
- * RaftActor
- * <p/>
+ * RaftActor.
+ *
+ * <p>
* Since this is set to 100 milliseconds the Election timeout should be
* at least 200 milliseconds
*/
public static final FiniteDuration HEART_BEAT_INTERVAL =
new FiniteDuration(100, TimeUnit.MILLISECONDS);
+ private final Supplier<RaftPolicy> policySupplier = Suppliers.memoize(this::getPolicy);
+
private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+ private int recoverySnapshotIntervalSeconds = RECOVERY_SNAPSHOT_INTERVAL_SECONDS;
private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
private FiniteDuration electionTimeOutInterval;
// in-memory journal can use before it needs to snapshot
private int snapshotDataThresholdPercentage = 12;
+ // max size of in-memory journal in MB
+ // 0 means direct threshold if disabled
+ private int snapshotDataThreshold = 0;
+
+ private int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
+
private long electionTimeoutFactor = 2;
+ private long candidateElectionTimeoutDivisor = 1;
+ private String customRaftPolicyImplementationClass;
- public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
+ private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE;
+
+ private String tempFileDirectory = "";
+
+ private int fileBackedStreamingThreshold = 128 * MEGABYTE;
+
+ private long syncIndexThreshold = 10;
+
+ public void setHeartBeatInterval(final FiniteDuration heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
electionTimeOutInterval = null;
}
- public void setSnapshotBatchCount(long snapshotBatchCount) {
+ public void setSnapshotBatchCount(final long snapshotBatchCount) {
this.snapshotBatchCount = snapshotBatchCount;
}
- public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage){
+ public void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) {
+ checkArgument(recoverySnapshotInterval >= 0);
+ recoverySnapshotIntervalSeconds = recoverySnapshotInterval;
+ }
+
+ public void setSnapshotDataThresholdPercentage(final int snapshotDataThresholdPercentage) {
this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
}
- public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
+ public void setSnapshotDataThreshold(final int snapshotDataThreshold) {
+ this.snapshotDataThreshold = snapshotDataThreshold;
+ }
+
+ public void setSnapshotChunkSize(final int snapshotChunkSize) {
+ this.snapshotChunkSize = snapshotChunkSize;
+ }
+
+ public void setJournalRecoveryLogBatchSize(final int journalRecoveryLogBatchSize) {
this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
}
- public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
+ public void setIsolatedLeaderCheckInterval(final FiniteDuration isolatedLeaderCheckInterval) {
this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
}
- public void setElectionTimeoutFactor(long electionTimeoutFactor){
+ public void setElectionTimeoutFactor(final long electionTimeoutFactor) {
this.electionTimeoutFactor = electionTimeoutFactor;
electionTimeOutInterval = null;
}
+ public void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) {
+ this.candidateElectionTimeoutDivisor = candidateElectionTimeoutDivisor;
+ }
+
+ public void setTempFileDirectory(final String tempFileDirectory) {
+ this.tempFileDirectory = tempFileDirectory;
+ }
+
+ public void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
+ this.fileBackedStreamingThreshold = fileBackedStreamingThreshold;
+ }
+
+ public void setCustomRaftPolicyImplementationClass(final String customRaftPolicyImplementationClass) {
+ this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass;
+ }
+
+ @Override
+ public String getCustomRaftPolicyImplementationClass() {
+ return customRaftPolicyImplementationClass;
+ }
+
@Override
public long getSnapshotBatchCount() {
return snapshotBatchCount;
return snapshotDataThresholdPercentage;
}
+ @Override
+ public int getSnapshotDataThreshold() {
+ return snapshotDataThreshold;
+ }
+
+ @Override
+ public int getRecoverySnapshotIntervalSeconds() {
+ return recoverySnapshotIntervalSeconds;
+ }
@Override
public FiniteDuration getHeartBeatInterval() {
@Override
public FiniteDuration getElectionTimeOutInterval() {
- if(electionTimeOutInterval == null) {
+ if (electionTimeOutInterval == null) {
electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor);
}
return electionTimeOutInterval;
}
+ @Override
+ public long getCandidateElectionTimeoutDivisor() {
+ return candidateElectionTimeoutDivisor;
+ }
+
@Override
public int getElectionTimeVariance() {
return ELECTION_TIME_MAX_VARIANCE;
@Override
public int getSnapshotChunkSize() {
- return SNAPSHOT_CHUNK_SIZE;
+ return snapshotChunkSize;
}
@Override
public long getElectionTimeoutFactor() {
return electionTimeoutFactor;
}
+
+ @Override
+ public RaftPolicy getRaftPolicy() {
+ return policySupplier.get();
+ }
+
+ @Override
+ public String getTempFileDirectory() {
+ return tempFileDirectory;
+ }
+
+ @Override
+ public int getFileBackedStreamingThreshold() {
+ return fileBackedStreamingThreshold;
+ }
+
+
+ @Override
+ public PeerAddressResolver getPeerAddressResolver() {
+ return peerAddressResolver;
+ }
+
+ public void setPeerAddressResolver(final @NonNull PeerAddressResolver peerAddressResolver) {
+ this.peerAddressResolver = requireNonNull(peerAddressResolver);
+ }
+
+ @Override
+ public long getSyncIndexThreshold() {
+ return syncIndexThreshold;
+ }
+
+ public void setSyncIndexThreshold(final long syncIndexThreshold) {
+ checkArgument(syncIndexThreshold >= 0);
+ this.syncIndexThreshold = syncIndexThreshold;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private RaftPolicy getPolicy() {
+ if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
+ LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
+ return DefaultRaftPolicy.INSTANCE;
+ }
+
+ try {
+ String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
+ LOG.info("Trying to use custom RaftPolicy {}", className);
+ return (RaftPolicy)Class.forName(className).getDeclaredConstructor().newInstance();
+ } catch (ClassCastException | ReflectiveOperationException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.error("Could not create custom raft policy, will stick with default", e);
+ } else {
+ LOG.error("Could not create custom raft policy, will stick with default : cause = {}",
+ e.getMessage());
+ }
+ }
+ return DefaultRaftPolicy.INSTANCE;
+ }
}