* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftReplicator;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RequestVote;
-import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
+import scala.concurrent.duration.FiniteDuration;
/**
* The behavior of a RaftActor when it is in the Leader state
- * <p>
+ * <p/>
* Leaders:
* <ul>
* <li> Upon election: send initial empty AppendEntries RPCs
* of matchIndex[i] ≥ N, and log[N].term == currentTerm:
* set commitIndex = N (§5.3, §5.4).
*/
-public class Leader extends AbstractRaftActorBehavior {
-
+public class Leader extends AbstractLeader {
+ private Cancellable installSnapshotSchedule = null;
+ private Cancellable isolatedLeaderCheckSchedule = null;
- private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
-
- public Leader(RaftActorContext context, List<String> followers){
+ public Leader(RaftActorContext context) {
super(context);
- for(String follower : followers) {
+ scheduleInstallSnapshotCheck(context.getConfigParams().getIsolatedCheckInterval());
- ActorRef replicator = context.actorOf(
- RaftReplicator.props(
- new FollowerLogInformationImpl(follower,
- new AtomicLong(0),
- new AtomicLong(0)),
- context.getActor()
- )
- );
+ scheduleIsolatedLeaderCheck(
+ new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
+ context.getConfigParams().getHeartBeatInterval().unit()));
+ }
- // Create a replicator for each follower
- followerToReplicator.put(follower, replicator);
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ Preconditions.checkNotNull(sender, "sender should not be null");
+ if (originalMessage instanceof IsolatedLeaderCheck) {
+ if (isLeaderIsolated()) {
+ LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ minIsolatedLeaderPeerCount, leaderId);
+ return switchBehavior(new IsolatedLeader(context));
+ }
}
+ return super.handleMessage(sender, originalMessage);
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState suggestedState) {
- return suggestedState;
+ protected void stopInstallSnapshotSchedule() {
+ if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+ installSnapshotSchedule.cancel();
+ }
}
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
- return suggestedState;
+ protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+ if (getFollowerIds().isEmpty()) {
+ // Optimization - do not bother scheduling a heartbeat as there are
+ // no followers
+ return;
+ }
+
+ stopInstallSnapshotSchedule();
+
+ // Schedule a message to send append entries to followers that can
+ // accept an append entries with some data in it
+ installSnapshotSchedule =
+ context.getActorSystem().scheduler().scheduleOnce(
+ interval,
+ context.getActor(), new InitiateInstallSnapshot(),
+ context.getActorSystem().dispatcher(), context.getActor());
}
- @Override protected RaftState handleRequestVote(ActorRef sender,
- RequestVote requestVote, RaftState suggestedState) {
- return suggestedState;
+ protected void stopIsolatedLeaderCheckSchedule() {
+ if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
+ isolatedLeaderCheckSchedule.cancel();
+ }
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply, RaftState suggestedState) {
- return suggestedState;
+ protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
+ isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
+ context.getActor(), new IsolatedLeaderCheck(),
+ context.getActorSystem().dispatcher(), context.getActor());
}
- @Override protected RaftState state() {
- return RaftState.Leader;
+ @Override
+ public void close() throws Exception {
+ stopInstallSnapshotSchedule();
+ stopIsolatedLeaderCheckSchedule();
+ super.close();
}
- @Override public RaftState handleMessage(ActorRef sender, Object message) {
- Preconditions.checkNotNull(sender, "sender should not be null");
+ @VisibleForTesting
+ void markFollowerActive(String followerId) {
+ getFollower(followerId).markFollowerActive();
+ }
- if(message instanceof SendHeartBeat) {
- sender.tell(new AppendEntries(
- context.getTermInformation().getCurrentTerm().get() , context.getId(),
- context.getReplicatedLog().last().getIndex(),
- context.getReplicatedLog().last().getTerm(),
- Collections.EMPTY_LIST, context.getCommitIndex().get()), context.getActor());
- return state();
- }
- return super.handleMessage(sender, message);
+ @VisibleForTesting
+ void markFollowerInActive(String followerId) {
+ getFollower(followerId).markFollowerInActive();
}
}