private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
- private FiniteDuration isolatedLeaderCheckInterval =
- new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
+ private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
// 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
// in-memory journal can use before it needs to snapshot
}
public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
- this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval;
+ this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
}
public void setElectionTimeoutFactor(long electionTimeoutFactor){
}
@Override
- public FiniteDuration getIsolatedCheckInterval() {
+ public long getIsolatedCheckIntervalInMillis() {
return isolatedLeaderCheckInterval;
}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
-import akka.actor.Cancellable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
-import scala.concurrent.duration.FiniteDuration;
/**
* The behavior of a RaftActor when it is in the Leader state
* set commitIndex = N (§5.3, §5.4).
*/
public class Leader extends AbstractLeader {
- private Cancellable installSnapshotSchedule = null;
- private Cancellable isolatedLeaderCheckSchedule = null;
+ private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
+ private final Stopwatch isolatedLeaderCheck;
public Leader(RaftActorContext context) {
super(context);
-
- scheduleIsolatedLeaderCheck(
- new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
- context.getConfigParams().getHeartBeatInterval().unit()));
+ isolatedLeaderCheck = Stopwatch.createStarted();
}
@Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
if (originalMessage instanceof IsolatedLeaderCheck) {
if (isLeaderIsolated()) {
- LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
context.getId(), minIsolatedLeaderPeerCount, leaderId);
+
return switchBehavior(new IsolatedLeader(context));
}
}
return super.handleMessage(sender, originalMessage);
}
- protected void stopIsolatedLeaderCheckSchedule() {
- if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
- isolatedLeaderCheckSchedule.cancel();
+ @Override
+ protected void beforeSendHeartbeat(){
+ if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
+ context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
+ isolatedLeaderCheck.reset().start();
}
- }
- protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
- isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
- context.getActor(), new IsolatedLeaderCheck(),
- context.getActorSystem().dispatcher(), context.getActor());
}
@Override
public void close() throws Exception {
- stopIsolatedLeaderCheckSchedule();
super.close();
}
assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout());
assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent());
assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader());
- assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckInterval().length());
+ assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());