X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FDefaultConfigParamsImpl.java;h=c83f90ec430e9d4ec0228b53e65b4cd9d9d726b4;hb=HEAD;hp=56fb63367207b94ad368214a569e2009f8c5648f;hpb=d796a8de8b208ca24bb57aebfc689f8be8bc2c7b;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 56fb633672..c83f90ec43 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -7,12 +7,14 @@ */ package org.opendaylight.controller.cluster.raft; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import com.google.common.base.Strings; -import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; +import java.util.function.Supplier; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.slf4j.Logger; @@ -27,6 +29,10 @@ public class DefaultConfigParamsImpl implements ConfigParams { private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigParamsImpl.class); private static final int SNAPSHOT_BATCH_COUNT = 20000; + /** + * Interval after which a snapshot should be taken during the recovery process. 0 if never. + */ + private static final int RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0; private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000; @@ -35,7 +41,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { */ private static final int ELECTION_TIME_MAX_VARIANCE = 100; - private static final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB + private static final int MAXIMUM_MESSAGE_SLICE_SIZE = 480 * 1024; // 480KiB /** @@ -49,9 +55,12 @@ public class DefaultConfigParamsImpl implements ConfigParams { public static final FiniteDuration HEART_BEAT_INTERVAL = new FiniteDuration(100, TimeUnit.MILLISECONDS); + private final Supplier policySupplier = Suppliers.memoize(this::getPolicy); + private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL; private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT; private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE; + private int recoverySnapshotIntervalSeconds = RECOVERY_SNAPSHOT_INTERVAL_SECONDS; private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis(); private FiniteDuration electionTimeOutInterval; @@ -59,58 +68,76 @@ public class DefaultConfigParamsImpl implements ConfigParams { // in-memory journal can use before it needs to snapshot private int snapshotDataThresholdPercentage = 12; - private int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE; + // max size of in-memory journal in MB + // 0 means direct threshold if disabled + private int snapshotDataThreshold = 0; + + private int maximumMessageSliceSize = MAXIMUM_MESSAGE_SLICE_SIZE; private long electionTimeoutFactor = 2; + private long candidateElectionTimeoutDivisor = 1; private String customRaftPolicyImplementationClass; - private final Supplier policySupplier = Suppliers.memoize(new PolicySupplier()); - private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE; private String tempFileDirectory = ""; private int fileBackedStreamingThreshold = 128 * MEGABYTE; - public void setHeartBeatInterval(FiniteDuration heartBeatInterval) { + private long syncIndexThreshold = 10; + + public void setHeartBeatInterval(final FiniteDuration heartBeatInterval) { this.heartBeatInterval = heartBeatInterval; electionTimeOutInterval = null; } - public void setSnapshotBatchCount(long snapshotBatchCount) { + public void setSnapshotBatchCount(final long snapshotBatchCount) { this.snapshotBatchCount = snapshotBatchCount; } - public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage) { + public void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) { + checkArgument(recoverySnapshotInterval >= 0); + recoverySnapshotIntervalSeconds = recoverySnapshotInterval; + } + + public void setSnapshotDataThresholdPercentage(final int snapshotDataThresholdPercentage) { this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage; } - public void setSnapshotChunkSize(int snapshotChunkSize) { - this.snapshotChunkSize = snapshotChunkSize; + public void setSnapshotDataThreshold(final int snapshotDataThreshold) { + this.snapshotDataThreshold = snapshotDataThreshold; + } + + public void setMaximumMessageSliceSize(final int maximumMessageSliceSize) { + this.maximumMessageSliceSize = maximumMessageSliceSize; } - public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) { + public void setJournalRecoveryLogBatchSize(final int journalRecoveryLogBatchSize) { this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize; } - public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) { + public void setIsolatedLeaderCheckInterval(final FiniteDuration isolatedLeaderCheckInterval) { this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis(); } - public void setElectionTimeoutFactor(long electionTimeoutFactor) { + public void setElectionTimeoutFactor(final long electionTimeoutFactor) { this.electionTimeoutFactor = electionTimeoutFactor; electionTimeOutInterval = null; } - public void setTempFileDirectory(String tempFileDirectory) { + public void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) { + this.candidateElectionTimeoutDivisor = candidateElectionTimeoutDivisor; + } + + public void setTempFileDirectory(final String tempFileDirectory) { this.tempFileDirectory = tempFileDirectory; } - public void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) { + public void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) { this.fileBackedStreamingThreshold = fileBackedStreamingThreshold; } - public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass) { + public void setCustomRaftPolicyImplementationClass(final String customRaftPolicyImplementationClass) { this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass; } @@ -129,6 +156,15 @@ public class DefaultConfigParamsImpl implements ConfigParams { return snapshotDataThresholdPercentage; } + @Override + public int getSnapshotDataThreshold() { + return snapshotDataThreshold; + } + + @Override + public int getRecoverySnapshotIntervalSeconds() { + return recoverySnapshotIntervalSeconds; + } @Override public FiniteDuration getHeartBeatInterval() { @@ -144,14 +180,19 @@ public class DefaultConfigParamsImpl implements ConfigParams { return electionTimeOutInterval; } + @Override + public long getCandidateElectionTimeoutDivisor() { + return candidateElectionTimeoutDivisor; + } + @Override public int getElectionTimeVariance() { return ELECTION_TIME_MAX_VARIANCE; } @Override - public int getSnapshotChunkSize() { - return snapshotChunkSize; + public int getMaximumMessageSliceSize() { + return maximumMessageSliceSize; } @Override @@ -184,37 +225,45 @@ public class DefaultConfigParamsImpl implements ConfigParams { return fileBackedStreamingThreshold; } - private class PolicySupplier implements Supplier { - @Override - @SuppressWarnings("checkstyle:IllegalCatch") - public RaftPolicy get() { - if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) { - LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy"); - return DefaultRaftPolicy.INSTANCE; - } - - try { - String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass; - LOG.info("Trying to use custom RaftPolicy {}", className); - return (RaftPolicy)Class.forName(className).newInstance(); - } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.error("Could not create custom raft policy, will stick with default", e); - } else { - LOG.error("Could not create custom raft policy, will stick with default : cause = {}", - e.getMessage()); - } - } - return DefaultRaftPolicy.INSTANCE; - } - } @Override public PeerAddressResolver getPeerAddressResolver() { return peerAddressResolver; } - public void setPeerAddressResolver(@Nonnull PeerAddressResolver peerAddressResolver) { - this.peerAddressResolver = Preconditions.checkNotNull(peerAddressResolver); + public void setPeerAddressResolver(final @NonNull PeerAddressResolver peerAddressResolver) { + this.peerAddressResolver = requireNonNull(peerAddressResolver); + } + + @Override + public long getSyncIndexThreshold() { + return syncIndexThreshold; + } + + public void setSyncIndexThreshold(final long syncIndexThreshold) { + checkArgument(syncIndexThreshold >= 0); + this.syncIndexThreshold = syncIndexThreshold; + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private RaftPolicy getPolicy() { + if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) { + LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy"); + return DefaultRaftPolicy.INSTANCE; + } + + try { + String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass; + LOG.info("Trying to use custom RaftPolicy {}", className); + return (RaftPolicy)Class.forName(className).getDeclaredConstructor().newInstance(); + } catch (ClassCastException | ReflectiveOperationException e) { + if (LOG.isDebugEnabled()) { + LOG.error("Could not create custom raft policy, will stick with default", e); + } else { + LOG.error("Could not create custom raft policy, will stick with default : cause = {}", + e.getMessage()); + } + } + return DefaultRaftPolicy.INSTANCE; } }