X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FDefaultConfigParamsImpl.java;h=f5f410c75b5f6117bd1f3fea6371d521a971517a;hp=9d06f6360473097beefbbce34962d7433f447f88;hb=38fa2a64bd6e206b2d8a6b153154104347854408;hpb=4019c9fd2ad99628dd790acc1ad4c104f48b6428 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 9d06f63604..f5f410c75b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -7,9 +7,17 @@ */ package org.opendaylight.controller.cluster.raft; -import scala.concurrent.duration.FiniteDuration; - +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; +import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * Default implementation of the ConfigParams @@ -18,14 +26,18 @@ import java.util.concurrent.TimeUnit; */ public class DefaultConfigParamsImpl implements ConfigParams { + private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigParamsImpl.class); + private static final int SNAPSHOT_BATCH_COUNT = 20000; + private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000; + /** * The maximum election time variance */ private static final int ELECTION_TIME_MAX_VARIANCE = 100; - private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB + private static final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB /** @@ -38,22 +50,87 @@ public class DefaultConfigParamsImpl implements ConfigParams { public static final FiniteDuration HEART_BEAT_INTERVAL = new FiniteDuration(100, TimeUnit.MILLISECONDS); + private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL; + private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT; + private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE; + private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis(); + private FiniteDuration electionTimeOutInterval; + + // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's + // in-memory journal can use before it needs to snapshot + private int snapshotDataThresholdPercentage = 12; + + private int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE; + + private long electionTimeoutFactor = 2; + private String customRaftPolicyImplementationClass; + + private final Supplier policySupplier = Suppliers.memoize(new PolicySupplier()); + + private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE; + + public void setHeartBeatInterval(FiniteDuration heartBeatInterval) { + this.heartBeatInterval = heartBeatInterval; + electionTimeOutInterval = null; + } + + public void setSnapshotBatchCount(long snapshotBatchCount) { + this.snapshotBatchCount = snapshotBatchCount; + } + + public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage){ + this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage; + } + + public void setSnapshotChunkSize(int snapshotChunkSize) { + this.snapshotChunkSize = snapshotChunkSize; + } + + public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) { + this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize; + } + + public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) { + this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis(); + } + + public void setElectionTimeoutFactor(long electionTimeoutFactor){ + this.electionTimeoutFactor = electionTimeoutFactor; + electionTimeOutInterval = null; + } + + public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass){ + this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass; + } + + @Override + public String getCustomRaftPolicyImplementationClass() { + return customRaftPolicyImplementationClass; + } @Override public long getSnapshotBatchCount() { - return SNAPSHOT_BATCH_COUNT; + return snapshotBatchCount; } @Override - public FiniteDuration getHeartBeatInterval() { - return HEART_BEAT_INTERVAL; + public int getSnapshotDataThresholdPercentage() { + return snapshotDataThresholdPercentage; } + @Override + public FiniteDuration getHeartBeatInterval() { + return heartBeatInterval; + } + @Override public FiniteDuration getElectionTimeOutInterval() { - // returns 2 times the heart beat interval - return getHeartBeatInterval().$times(2); + if(electionTimeOutInterval == null) { + electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor); + } + + return electionTimeOutInterval; } @Override @@ -63,6 +140,59 @@ public class DefaultConfigParamsImpl implements ConfigParams { @Override public int getSnapshotChunkSize() { - return SNAPSHOT_CHUNK_SIZE; + return snapshotChunkSize; + } + + @Override + public int getJournalRecoveryLogBatchSize() { + return journalRecoveryLogBatchSize; + } + + @Override + public long getIsolatedCheckIntervalInMillis() { + return isolatedLeaderCheckInterval; + } + + @Override + public long getElectionTimeoutFactor() { + return electionTimeoutFactor; + } + + @Override + public RaftPolicy getRaftPolicy() { + return policySupplier.get(); + } + + private class PolicySupplier implements Supplier{ + @Override + public RaftPolicy get() { + if(Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)){ + LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy"); + return DefaultRaftPolicy.INSTANCE; + } + try { + String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass; + LOG.info("Trying to use custom RaftPolicy {}", className); + Class c = Class.forName(className); + RaftPolicy obj = (RaftPolicy)c.newInstance(); + return obj; + } catch (Exception e) { + if(LOG.isDebugEnabled()) { + LOG.error("Could not create custom raft policy, will stick with default", e); + } else { + LOG.error("Could not create custom raft policy, will stick with default : cause = {}", e.getMessage()); + } + } + return DefaultRaftPolicy.INSTANCE; + } + } + + @Override + public PeerAddressResolver getPeerAddressResolver() { + return peerAddressResolver; + } + + public void setPeerAddressResolver(@Nonnull PeerAddressResolver peerAddressResolver) { + this.peerAddressResolver = Preconditions.checkNotNull(peerAddressResolver); } }