* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
-import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RequestVote;
-import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
/**
* The behavior of a RaftActor when it is in the Leader state
* of matchIndex[i] ≥ N, and log[N].term == currentTerm:
* set commitIndex = N (§5.3, §5.4).
*/
-public class Leader extends AbstractRaftActorBehavior {
-
- /**
- * The interval at which a heart beat message will be sent to the remote
- * RaftActor
- * <p/>
- * Since this is set to 100 milliseconds the Election timeout should be
- * at least 200 milliseconds
- */
- public static final FiniteDuration HEART_BEAT_INTERVAL =
- new FiniteDuration(100, TimeUnit.MILLISECONDS);
-
- private final Map<String, FollowerLogInformation> followerToLog =
- new HashMap();
-
- private final Map<String, ActorSelection> followerToActor = new HashMap<>();
+public class Leader extends AbstractLeader {
+ private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
+ private final Stopwatch isolatedLeaderCheck;
- private Cancellable heartbeatCancel = null;
-
- public Leader(RaftActorContext context, List<String> followePaths) {
+ public Leader(RaftActorContext context) {
super(context);
+ isolatedLeaderCheck = Stopwatch.createStarted();
+ }
- for (String followerPath : followePaths) {
- FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerPath,
- new AtomicLong(0),
- new AtomicLong(0));
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ Preconditions.checkNotNull(sender, "sender should not be null");
- followerToActor.put(followerPath,
- context.actorSelection(followerLogInformation.getId()));
- followerToLog.put(followerPath, followerLogInformation);
+ if (originalMessage instanceof IsolatedLeaderCheck) {
+ if (isLeaderIsolated()) {
+ LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ context.getId(), minIsolatedLeaderPeerCount, leaderId);
+ return switchBehavior(new IsolatedLeader(context));
+ }
}
- // Immediately schedule a heartbeat
- // Upon election: send initial empty AppendEntries RPCs
- // (heartbeat) to each server; repeat during idle periods to
- // prevent election timeouts (§5.2)
- scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
-
-
+ return super.handleMessage(sender, originalMessage);
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState suggestedState) {
- return suggestedState;
- }
+ @Override
+ protected void beforeSendHeartbeat(){
+ if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
+ context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
+ isolatedLeaderCheck.reset().start();
+ }
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
- return suggestedState;
}
- @Override protected RaftState handleRequestVote(ActorRef sender,
- RequestVote requestVote, RaftState suggestedState) {
- return suggestedState;
+ @Override
+ public void close() throws Exception {
+ super.close();
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply, RaftState suggestedState) {
- return suggestedState;
+ @VisibleForTesting
+ void markFollowerActive(String followerId) {
+ getFollower(followerId).markFollowerActive();
}
- @Override protected RaftState state() {
- return RaftState.Leader;
+ @VisibleForTesting
+ void markFollowerInActive(String followerId) {
+ getFollower(followerId).markFollowerInActive();
}
-
- @Override public RaftState handleMessage(ActorRef sender, Object message) {
- Preconditions.checkNotNull(sender, "sender should not be null");
-
- scheduleHeartBeat(HEART_BEAT_INTERVAL);
-
- if (message instanceof SendHeartBeat) {
- for (ActorSelection follower : followerToActor.values()) {
- follower.tell(new AppendEntries(
- context.getTermInformation().getCurrentTerm().get(),
- context.getId(),
- context.getReplicatedLog().last().getIndex(),
- context.getReplicatedLog().last().getTerm(),
- Collections.EMPTY_LIST, context.getCommitIndex().get()),
- context.getActor());
- }
- return state();
- }
- return super.handleMessage(sender, message);
- }
-
- private void scheduleHeartBeat(FiniteDuration interval) {
- if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
- heartbeatCancel.cancel();
- }
-
- // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
- // message is sent to itself.
- // Scheduling the heartbeat only once here because heartbeats do not
- // need to be sent if there are other messages being sent to the remote
- // actor.
- heartbeatCancel =
- context.getActorSystem().scheduler().scheduleOnce(interval,
- context.getActor(), new SendHeartBeat(),
- context.getActorSystem().dispatcher(), context.getActor());
- }
-
}